- add .h files for new mesh api
[oweals/gnunet.git] / src / mesh / mesh_api_enc.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4      GNUnet is free software; you can redistribute it and/or modify
5      it under the terms of the GNU General Public License as published
6      by the Free Software Foundation; either version 3, or (at your
7      option) any later version.
8      GNUnet is distributed in the hope that it will be useful, but
9      WITHOUT ANY WARRANTY; without even the implied warranty of
10      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11      General Public License for more details.
12      You should have received a copy of the GNU General Public License
13      along with GNUnet; see the file COPYING.  If not, write to the
14      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
15      Boston, MA 02111-1307, USA.
16 */
17
18 /**
19  * @file mesh/mesh_api_enc.c
20  * @brief mesh api: client implementation of new mesh service
21  * @author Bartlomiej Polot
22  */
23
24 #include "platform.h"
25 #include "gnunet_common.h"
26 #include "gnunet_client_lib.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_peer_lib.h"
29 #include "gnunet_mesh_service_enc.h"
30 #include "mesh.h"
31 #include "mesh_protocol.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "mesh-api",__VA_ARGS__)
34 #define DEBUG_ACK GNUNET_YES
35
36 /******************************************************************************/
37 /************************      DATA STRUCTURES     ****************************/
38 /******************************************************************************/
39
40 /**
41  * Transmission queue to the service
42  */
43 struct GNUNET_MESH_TransmitHandle
44 {
45
46     /**
47      * Double Linked list
48      */
49   struct GNUNET_MESH_TransmitHandle *next;
50
51     /**
52      * Double Linked list
53      */
54   struct GNUNET_MESH_TransmitHandle *prev;
55
56     /**
57      * Tunnel this message is sent on / for (may be NULL for control messages).
58      */
59   struct GNUNET_MESH_Tunnel *tunnel;
60
61     /**
62      * Callback to obtain the message to transmit, or NULL if we
63      * got the message in 'data'.  Notice that messages built
64      * by 'notify' need to be encapsulated with information about
65      * the 'target'.
66      */
67   GNUNET_CONNECTION_TransmitReadyNotify notify;
68
69     /**
70      * Closure for 'notify'
71      */
72   void *notify_cls;
73
74     /**
75      * How long is this message valid.  Once the timeout has been
76      * reached, the message must no longer be sent.  If this
77      * is a message with a 'notify' callback set, the 'notify'
78      * function should be called with 'buf' NULL and size 0.
79      */
80   struct GNUNET_TIME_Absolute timeout;
81
82     /**
83      * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
84      */
85   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
86
87     /**
88      * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
89      */
90   size_t size;
91 };
92
93
94 /**
95  * Opaque handle to the service.
96  */
97 struct GNUNET_MESH_Handle
98 {
99
100     /**
101      * Handle to the server connection, to send messages later
102      */
103   struct GNUNET_CLIENT_Connection *client;
104
105     /**
106      * Set of handlers used for processing incoming messages in the tunnels
107      */
108   const struct GNUNET_MESH_MessageHandler *message_handlers;
109
110   /**
111    * Number of handlers in the handlers array.
112    */
113   unsigned int n_handlers;
114
115   /**
116    * Ports open.
117    */
118   const uint32_t *ports;
119
120   /**
121    * Number of ports.
122    */
123   unsigned int n_ports;
124
125     /**
126      * Double linked list of the tunnels this client is connected to, head.
127      */
128   struct GNUNET_MESH_Tunnel *tunnels_head;
129
130     /**
131      * Double linked list of the tunnels this client is connected to, tail.
132      */
133   struct GNUNET_MESH_Tunnel *tunnels_tail;
134
135     /**
136      * Callback for inbound tunnel creation
137      */
138   GNUNET_MESH_InboundTunnelNotificationHandler *new_tunnel;
139
140     /**
141      * Callback for inbound tunnel disconnection
142      */
143   GNUNET_MESH_TunnelEndHandler *cleaner;
144
145     /**
146      * Handle to cancel pending transmissions in case of disconnection
147      */
148   struct GNUNET_CLIENT_TransmitHandle *th;
149
150     /**
151      * Closure for all the handlers given by the client
152      */
153   void *cls;
154
155     /**
156      * Messages to send to the service, head.
157      */
158   struct GNUNET_MESH_TransmitHandle *th_head;
159
160     /**
161      * Messages to send to the service, tail.
162      */
163   struct GNUNET_MESH_TransmitHandle *th_tail;
164
165     /**
166      * tid of the next tunnel to create (to avoid reusing IDs often)
167      */
168   MESH_TunnelNumber next_tid;
169
170     /**
171      * Have we started the task to receive messages from the service
172      * yet? We do this after we send the 'MESH_LOCAL_CONNECT' message.
173      */
174   int in_receive;
175
176   /**
177    * Configuration given by the client, in case of reconnection
178    */
179   const struct GNUNET_CONFIGURATION_Handle *cfg;
180
181   /**
182    * Time to the next reconnect in case one reconnect fails
183    */
184   struct GNUNET_TIME_Relative reconnect_time;
185   
186   /**
187    * Task for trying to reconnect.
188    */
189   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
190
191   /**
192    * Monitor callback
193    */
194   GNUNET_MESH_TunnelsCB tunnels_cb;
195
196   /**
197    * Monitor callback closure.
198    */
199   void *tunnels_cls;
200
201   /**
202    * Tunnel callback.
203    */
204   GNUNET_MESH_TunnelCB tunnel_cb;
205
206   /**
207    * Tunnel callback closure.
208    */
209   void *tunnel_cls;
210
211 #if DEBUG_ACK
212   unsigned int acks_sent;
213   unsigned int acks_recv;
214 #endif
215 };
216
217
218 /**
219  * Description of a peer
220  */
221 struct GNUNET_MESH_Peer
222 {
223     /**
224      * ID of the peer in short form
225      */
226   GNUNET_PEER_Id id;
227
228   /**
229    * Tunnel this peer belongs to
230    */
231   struct GNUNET_MESH_Tunnel *t;
232
233   /**
234    * Flag indicating whether service has informed about its connection
235    */
236   int connected;
237
238 };
239
240
241 /**
242  * Opaque handle to a tunnel.
243  */
244 struct GNUNET_MESH_Tunnel
245 {
246
247     /**
248      * DLL next
249      */
250   struct GNUNET_MESH_Tunnel *next;
251
252     /**
253      * DLL prev
254      */
255   struct GNUNET_MESH_Tunnel *prev;
256
257     /**
258      * Handle to the mesh this tunnel belongs to
259      */
260   struct GNUNET_MESH_Handle *mesh;
261
262     /**
263      * Local ID of the tunnel
264      */
265   MESH_TunnelNumber tid;
266
267     /**
268      * Port number.
269      */
270   uint32_t port;
271
272     /**
273      * Other end of the tunnel.
274      */
275   GNUNET_PEER_Id peer;
276
277   /**
278    * Any data the caller wants to put in here
279    */
280   void *ctx;
281
282     /**
283      * Size of packet queued in this tunnel
284      */
285   unsigned int packet_size;
286
287     /**
288      * Is the tunnel allowed to buffer?
289      */
290   int nobuffer;
291
292     /**
293      * Is the tunnel realiable?
294      */
295   int reliable;
296
297     /**
298      * If reliable, is the tunnel out of order?
299      */
300   int ooorder;
301
302     /**
303      * Are we allowed to send to the service?
304      */
305   int allow_send;
306
307 };
308
309
310 /**
311  * Implementation state for mesh's message queue.
312  */
313 struct MeshMQState
314 {
315   /**
316    * The current transmit handle, or NULL
317    * if no transmit is active.
318    */
319   struct GNUNET_MESH_TransmitHandle *th;
320
321   /**
322    * Tunnel to send the data over.
323    */
324   struct GNUNET_MESH_Tunnel *tunnel;
325 };
326
327
328 /******************************************************************************/
329 /***********************         DECLARATIONS         *************************/
330 /******************************************************************************/
331
332 /**
333  * Function called to send a message to the service.
334  * "buf" will be NULL and "size" zero if the socket was closed for writing in
335  * the meantime.
336  *
337  * @param cls closure, the mesh handle
338  * @param size number of bytes available in buf
339  * @param buf where the callee should write the connect message
340  * @return number of bytes written to buf
341  */
342 static size_t
343 send_callback (void *cls, size_t size, void *buf);
344
345
346 /******************************************************************************/
347 /***********************     AUXILIARY FUNCTIONS      *************************/
348 /******************************************************************************/
349
350 /**
351  * Check if transmission is a payload packet.
352  *
353  * @param th Transmission handle.
354  *
355  * @return GNUNET_YES if it is a payload packet,
356  *         GNUNET_NO if it is a mesh management packet.
357  */
358 static int
359 th_is_payload (struct GNUNET_MESH_TransmitHandle *th)
360 {
361   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
362 }
363
364
365 /**
366  * Check whether there is any message ready in the queue and find the size.
367  * 
368  * @param h Mesh handle.
369  * 
370  * @return The size of the first ready message in the queue,
371  *         0 if there is none.
372  */
373 static size_t
374 message_ready_size (struct GNUNET_MESH_Handle *h)
375 {
376   struct GNUNET_MESH_TransmitHandle *th;
377   struct GNUNET_MESH_Tunnel *t;
378
379   for (th = h->th_head; NULL != th; th = th->next)
380   {
381     t = th->tunnel;
382     if (GNUNET_NO == th_is_payload (th))
383     {
384       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message internal\n");
385       return th->size;
386     }
387     if (GNUNET_YES == t->allow_send)
388     {
389       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message payload ok\n");
390       return th->size;
391     }
392   }
393   return 0;
394 }
395
396
397 /**
398  * Get the tunnel handler for the tunnel specified by id from the given handle
399  * @param h Mesh handle
400  * @param tid ID of the wanted tunnel
401  * @return handle to the required tunnel or NULL if not found
402  */
403 static struct GNUNET_MESH_Tunnel *
404 retrieve_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
405 {
406   struct GNUNET_MESH_Tunnel *t;
407
408   t = h->tunnels_head;
409   while (t != NULL)
410   {
411     if (t->tid == tid)
412       return t;
413     t = t->next;
414   }
415   return NULL;
416 }
417
418
419 /**
420  * Create a new tunnel and insert it in the tunnel list of the mesh handle
421  * @param h Mesh handle
422  * @param tid desired tid of the tunnel, 0 to assign one automatically
423  * @return handle to the created tunnel
424  */
425 static struct GNUNET_MESH_Tunnel *
426 create_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
427 {
428   struct GNUNET_MESH_Tunnel *t;
429
430   t = GNUNET_malloc (sizeof (struct GNUNET_MESH_Tunnel));
431   GNUNET_CONTAINER_DLL_insert (h->tunnels_head, h->tunnels_tail, t);
432   t->mesh = h;
433   if (0 == tid)
434   {
435     t->tid = h->next_tid;
436     while (NULL != retrieve_tunnel (h, h->next_tid))
437     {
438       h->next_tid++;
439       h->next_tid &= ~GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
440       h->next_tid |= GNUNET_MESH_LOCAL_TUNNEL_ID_CLI;
441     }
442   }
443   else
444   {
445     t->tid = tid;
446   }
447   t->allow_send = GNUNET_NO;
448   t->nobuffer = GNUNET_NO;
449   return t;
450 }
451
452
453 /**
454  * Destroy the specified tunnel.
455  * - Destroys all peers, calling the disconnect callback on each if needed
456  * - Cancels all outgoing traffic for that tunnel, calling respective notifys
457  * - Calls cleaner if tunnel was inbound
458  * - Frees all memory used
459  *
460  * @param t Pointer to the tunnel.
461  * @param call_cleaner Whether to call the cleaner handler.
462  *
463  * @return Handle to the required tunnel or NULL if not found.
464  */
465 static void
466 destroy_tunnel (struct GNUNET_MESH_Tunnel *t, int call_cleaner)
467 {
468   struct GNUNET_MESH_Handle *h;
469   struct GNUNET_MESH_TransmitHandle *th;
470   struct GNUNET_MESH_TransmitHandle *next;
471
472   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroy_tunnel %X\n", t->tid);
473
474   if (NULL == t)
475   {
476     GNUNET_break (0);
477     return;
478   }
479   h = t->mesh;
480
481   GNUNET_CONTAINER_DLL_remove (h->tunnels_head, h->tunnels_tail, t);
482
483   /* signal tunnel destruction */
484   if ( (NULL != h->cleaner) && (0 != t->peer) && (GNUNET_YES == call_cleaner) )
485     h->cleaner (h->cls, t, t->ctx);
486
487   /* check that clients did not leave messages behind in the queue */
488   for (th = h->th_head; NULL != th; th = next)
489   {
490     next = th->next;
491     if (th->tunnel != t)
492       continue;
493     /* Clients should have aborted their requests already.
494      * Management traffic should be ok, as clients can't cancel that */
495     GNUNET_break (GNUNET_NO == th_is_payload(th));
496     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
497
498     /* clean up request */
499     if (GNUNET_SCHEDULER_NO_TASK != th->timeout_task)
500       GNUNET_SCHEDULER_cancel (th->timeout_task);
501     GNUNET_free (th);    
502   }
503
504   /* if there are no more pending requests with mesh service, cancel active request */
505   /* Note: this should be unnecessary... */
506   if ((0 == message_ready_size (h)) && (NULL != h->th))
507   {
508     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
509     h->th = NULL;
510   }
511
512   if (0 != t->peer)
513     GNUNET_PEER_change_rc (t->peer, -1);
514   GNUNET_free (t);
515   return;
516 }
517
518
519 /**
520  * Notify client that the transmission has timed out
521  * 
522  * @param cls closure
523  * @param tc task context
524  */
525 static void
526 timeout_transmission (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
527 {
528   struct GNUNET_MESH_TransmitHandle *th = cls;
529   struct GNUNET_MESH_Handle *mesh;
530
531   mesh = th->tunnel->mesh;
532   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
533   th->tunnel->packet_size = 0;
534   if (GNUNET_YES == th_is_payload (th))
535     th->notify (th->notify_cls, 0, NULL);
536   GNUNET_free (th);
537   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
538   {
539     /* nothing ready to transmit, no point in asking for transmission */
540     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
541     mesh->th = NULL;
542   }
543 }
544
545
546 /**
547  * Add a transmit handle to the transmission queue and set the
548  * timeout if needed.
549  *
550  * @param h mesh handle with the queue head and tail
551  * @param th handle to the packet to be transmitted
552  */
553 static void
554 add_to_queue (struct GNUNET_MESH_Handle *h,
555               struct GNUNET_MESH_TransmitHandle *th)
556 {
557   GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
558   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value == th->timeout.abs_value)
559     return;
560   th->timeout_task =
561       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
562                                     (th->timeout), &timeout_transmission, th);
563 }
564
565
566 /**
567  * Auxiliary function to send an already constructed packet to the service.
568  * Takes care of creating a new queue element, copying the message and
569  * calling the tmt_rdy function if necessary.
570  *
571  * @param h mesh handle
572  * @param msg message to transmit
573  * @param tunnel tunnel this send is related to (NULL if N/A)
574  */
575 static void
576 send_packet (struct GNUNET_MESH_Handle *h,
577              const struct GNUNET_MessageHeader *msg,
578              struct GNUNET_MESH_Tunnel *tunnel);
579
580
581 /**
582  * Send an ack on the tunnel to confirm the processing of a message.
583  * 
584  * @param t Tunnel on which to send the ACK.
585  */
586 static void
587 send_ack (struct GNUNET_MESH_Tunnel *t)
588 {
589   struct GNUNET_MESH_LocalAck msg;
590
591   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on tunnel %X\n", t->tid);
592   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
593   msg.header.size = htons (sizeof (msg));
594   msg.tunnel_id = htonl (t->tid);
595
596 #if DEBUG_ACK
597   t->mesh->acks_sent++;
598 #endif
599
600   send_packet (t->mesh, &msg.header, t);
601   return;
602 }
603
604
605
606 /**
607  * Reconnect callback: tries to reconnect again after a failer previous
608  * reconnecttion
609  * @param cls closure (mesh handle)
610  * @param tc task context
611  */
612 static void
613 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
614
615
616 /**
617  * Send a connect packet to the service with the applications and types
618  * requested by the user.
619  *
620  * @param h The mesh handle.
621  *
622  */
623 static void
624 send_connect (struct GNUNET_MESH_Handle *h)
625 {
626   size_t size;
627
628   size = sizeof (struct GNUNET_MESH_ClientConnect);
629   size += h->n_ports * sizeof (uint32_t);
630   {
631     char buf[size] GNUNET_ALIGN;
632     struct GNUNET_MESH_ClientConnect *msg;
633     uint32_t *ports;
634     uint16_t i;
635
636     /* build connection packet */
637     msg = (struct GNUNET_MESH_ClientConnect *) buf;
638     msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
639     msg->header.size = htons (size);
640     ports = (uint32_t *) &msg[1];
641     for (i = 0; i < h->n_ports; i++)
642     {
643       ports[i] = htonl (h->ports[i]);
644       LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n",
645            h->ports[i]);
646     }
647     LOG (GNUNET_ERROR_TYPE_DEBUG,
648          "Sending %lu bytes long message with %u ports\n",
649          ntohs (msg->header.size), h->n_ports);
650     send_packet (h, &msg->header, NULL);
651   }
652 }
653
654
655 /**
656  * Reconnect to the service, retransmit all infomation to try to restore the
657  * original state.
658  *
659  * @param h handle to the mesh
660  *
661  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
662  */
663 static int
664 do_reconnect (struct GNUNET_MESH_Handle *h)
665 {
666   struct GNUNET_MESH_Tunnel *t;
667
668   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
669   LOG (GNUNET_ERROR_TYPE_DEBUG, "*******   RECONNECT   *******\n");
670   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
671   LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
672   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
673
674   /* disconnect */
675   if (NULL != h->th)
676   {
677     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
678     h->th = NULL;
679   }
680   if (NULL != h->client)
681   {
682     GNUNET_CLIENT_disconnect (h->client);
683   }
684
685   /* connect again */
686   h->client = GNUNET_CLIENT_connect ("mesh", h->cfg);
687   if (h->client == NULL)
688   {
689     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
690                                                       &reconnect_cbk, h);
691     h->reconnect_time =
692         GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
693                                   GNUNET_TIME_relative_multiply
694                                   (h->reconnect_time, 2));
695     LOG (GNUNET_ERROR_TYPE_DEBUG, 
696          "Next retry in %s\n",
697          GNUNET_STRINGS_relative_time_to_string (h->reconnect_time,
698                                                  GNUNET_NO));
699     GNUNET_break (0);
700     return GNUNET_NO;
701   }
702   else
703   {
704     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
705   }
706   send_connect (h);
707   /* Rebuild all tunnels */
708   for (t = h->tunnels_head; NULL != t; t = t->next)
709   {
710     struct GNUNET_MESH_TunnelMessage tmsg;
711     uint32_t options;
712
713     if (t->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
714     {
715       /* Tunnel was created by service (incoming tunnel) */
716       /* TODO: Notify service of missing tunnel, to request
717        * creator to recreate path (find a path to him via DHT?)
718        */
719       continue;
720     }
721     t->allow_send = GNUNET_NO;
722     tmsg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
723     tmsg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
724     tmsg.tunnel_id = htonl (t->tid);
725     tmsg.port = htonl (t->port);
726     GNUNET_PEER_resolve (t->peer, &tmsg.peer);
727
728     options = 0;
729     if (GNUNET_YES == t->nobuffer)
730       options |= GNUNET_MESH_OPTION_NOBUFFER;
731
732     if (GNUNET_YES == t->reliable)
733       options |= GNUNET_MESH_OPTION_RELIABLE;
734
735     tmsg.opt = htonl (options);
736     send_packet (h, &tmsg.header, t);
737   }
738   return GNUNET_YES;
739 }
740
741 /**
742  * Reconnect callback: tries to reconnect again after a failer previous
743  * reconnecttion
744  * @param cls closure (mesh handle)
745  * @param tc task context
746  */
747 static void
748 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
749 {
750   struct GNUNET_MESH_Handle *h = cls;
751
752   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
753   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
754     return;
755   do_reconnect (h);
756 }
757
758
759 /**
760  * Reconnect to the service, retransmit all infomation to try to restore the
761  * original state.
762  *
763  * @param h handle to the mesh
764  *
765  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
766  */
767 static void
768 reconnect (struct GNUNET_MESH_Handle *h)
769 {
770   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requested RECONNECT\n");
771   h->in_receive = GNUNET_NO;
772   if (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task)
773     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
774                                                       &reconnect_cbk, h);
775 }
776
777
778 /******************************************************************************/
779 /***********************      RECEIVE HANDLERS     ****************************/
780 /******************************************************************************/
781
782 /**
783  * Process the new tunnel notification and add it to the tunnels in the handle
784  *
785  * @param h     The mesh handle
786  * @param msg   A message with the details of the new incoming tunnel
787  */
788 static void
789 process_tunnel_created (struct GNUNET_MESH_Handle *h,
790                         const struct GNUNET_MESH_TunnelMessage *msg)
791 {
792   struct GNUNET_MESH_Tunnel *t;
793   MESH_TunnelNumber tid;
794   uint32_t port;
795
796   tid = ntohl (msg->tunnel_id);
797   port = ntohl (msg->port);
798   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming tunnel %X:%u\n", tid, port);
799   if (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
800   {
801     GNUNET_break (0);
802     return;
803   }
804   if (NULL != h->new_tunnel)
805   {
806     t = create_tunnel (h, tid);
807     t->allow_send = GNUNET_NO;
808     t->peer = GNUNET_PEER_intern (&msg->peer);
809     t->mesh = h;
810     t->tid = tid;
811     t->port = port;
812     if (0 != (msg->opt & GNUNET_MESH_OPTION_NOBUFFER))
813       t->nobuffer = GNUNET_YES;
814     else
815       t->nobuffer = GNUNET_NO;
816     if (0 != (msg->opt & GNUNET_MESH_OPTION_RELIABLE))
817       t->reliable = GNUNET_YES;
818     else
819       t->reliable = GNUNET_NO;
820     if (GNUNET_YES == t->reliable &&
821         0 != (msg->opt & GNUNET_MESH_OPTION_OOORDER))
822       t->ooorder = GNUNET_YES;
823     else
824       t->ooorder = GNUNET_NO;
825     LOG (GNUNET_ERROR_TYPE_DEBUG, "  created tunnel %p\n", t);
826     t->ctx = h->new_tunnel (h->cls, t, &msg->peer, t->port);
827     LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
828   }
829   else
830   {
831     struct GNUNET_MESH_TunnelMessage d_msg;
832
833     LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming tunnels\n");
834
835     d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY);
836     d_msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
837     d_msg.tunnel_id = msg->tunnel_id;
838     memset (&d_msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
839     d_msg.port = 0;
840     d_msg.opt = 0;
841
842     send_packet (h, &d_msg.header, NULL);
843   }
844   return;
845 }
846
847
848 /**
849  * Process the tunnel destroy notification and free associated resources
850  *
851  * @param h     The mesh handle
852  * @param msg   A message with the details of the tunnel being destroyed
853  */
854 static void
855 process_tunnel_destroy (struct GNUNET_MESH_Handle *h,
856                         const struct GNUNET_MESH_TunnelMessage *msg)
857 {
858   struct GNUNET_MESH_Tunnel *t;
859   MESH_TunnelNumber tid;
860
861   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying tunnel from service\n");
862   tid = ntohl (msg->tunnel_id);
863   t = retrieve_tunnel (h, tid);
864
865   if (NULL == t)
866   {
867     LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X unknown\n", tid);
868     return;
869   }
870   LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X destroyed\n", t->tid);
871   destroy_tunnel (t, GNUNET_YES);
872 }
873
874
875 /**
876  * Process the incoming data packets, call appropriate handlers.
877  *
878  * @param h         The mesh handle
879  * @param message   A message encapsulating the data
880  */
881 static void
882 process_incoming_data (struct GNUNET_MESH_Handle *h,
883                        const struct GNUNET_MessageHeader *message)
884 {
885   const struct GNUNET_MessageHeader *payload;
886   const struct GNUNET_MESH_MessageHandler *handler;
887   struct GNUNET_MESH_LocalData *dmsg;
888   struct GNUNET_MESH_Tunnel *t;
889   unsigned int i;
890   uint16_t type;
891
892   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n");
893
894   dmsg = (struct GNUNET_MESH_LocalData *) message;
895
896   t = retrieve_tunnel (h, ntohl (dmsg->tid));
897   payload = (struct GNUNET_MessageHeader *) &dmsg[1];
898   LOG (GNUNET_ERROR_TYPE_DEBUG, "  %s data on tunnel %s [%X]\n",
899        t->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV ? "fwd" : "bck",
900        GNUNET_i2s (GNUNET_PEER_resolve2(t->peer)), ntohl (dmsg->tid));
901   if (NULL == t)
902   {
903     /* Tunnel was ignored/destroyed, probably service didn't get it yet */
904     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ignored!\n");
905     return;
906   }
907   type = ntohs (payload->type);
908   LOG (GNUNET_ERROR_TYPE_DEBUG, "  payload type %u\n", type);
909   for (i = 0; i < h->n_handlers; i++)
910   {
911     handler = &h->message_handlers[i];
912     LOG (GNUNET_ERROR_TYPE_DEBUG,
913          "    checking handler for type %u\n",
914          handler->type);
915     if (handler->type == type)
916     {
917       if (GNUNET_OK !=
918           handler->callback (h->cls, t, &t->ctx, payload))
919       {
920         LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
921         GNUNET_MESH_tunnel_destroy (t);
922         return;
923       }
924       else
925       {
926         LOG (GNUNET_ERROR_TYPE_DEBUG,
927              "callback completed successfully\n");
928       }
929     }
930   }
931 }
932
933
934 /**
935  * Process a local ACK message, enabling the client to send
936  * more data to the service.
937  * 
938  * @param h Mesh handle.
939  * @param message Message itself.
940  */
941 static void
942 process_ack (struct GNUNET_MESH_Handle *h,
943              const struct GNUNET_MessageHeader *message)
944 {
945   struct GNUNET_MESH_LocalAck *msg;
946   struct GNUNET_MESH_Tunnel *t;
947   MESH_TunnelNumber tid;
948
949   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
950   h->acks_recv++;
951   msg = (struct GNUNET_MESH_LocalAck *) message;
952   tid = ntohl (msg->tunnel_id);
953   t = retrieve_tunnel (h, tid);
954   if (NULL == t)
955   {
956     LOG (GNUNET_ERROR_TYPE_WARNING, "ACK on unknown tunnel %X\n", tid);
957     return;
958   }
959   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on tunnel %X!\n", t->tid);
960   t->allow_send = GNUNET_YES;
961   if (NULL == h->th && 0 < t->packet_size)
962   {
963     LOG (GNUNET_ERROR_TYPE_DEBUG, "  tmt rdy was NULL, requesting!\n");
964     h->th =
965         GNUNET_CLIENT_notify_transmit_ready (h->client, t->packet_size,
966                                              GNUNET_TIME_UNIT_FOREVER_REL,
967                                              GNUNET_YES, &send_callback, h);
968   }
969 }
970
971
972 /**
973  * Process a local reply about info on all tunnels, pass info to the user.
974  *
975  * @param h Mesh handle.
976  * @param message Message itself.
977  */
978 static void
979 process_get_tunnels (struct GNUNET_MESH_Handle *h,
980                      const struct GNUNET_MessageHeader *message)
981 {
982   struct GNUNET_MESH_LocalMonitor *msg;
983
984   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Tunnels messasge received\n");
985
986   if (NULL == h->tunnels_cb)
987   {
988     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
989     return;
990   }
991
992   msg = (struct GNUNET_MESH_LocalMonitor *) message;
993   if (ntohs (message->size) !=
994       (sizeof (struct GNUNET_MESH_LocalMonitor) +
995        sizeof (struct GNUNET_PeerIdentity)))
996   {
997     GNUNET_break_op (0);
998     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
999                 "Get tunnels message: size %hu - expected %u\n",
1000                 ntohs (message->size),
1001                 sizeof (struct GNUNET_MESH_LocalMonitor));
1002     return;
1003   }
1004   h->tunnels_cb (h->tunnels_cls,
1005                  ntohl (msg->tunnel_id),
1006                  &msg->owner,
1007                  &msg->destination);
1008 }
1009
1010
1011
1012 /**
1013  * Process a local monitor_tunnel reply, pass info to the user.
1014  *
1015  * @param h Mesh handle.
1016  * @param message Message itself.
1017  */
1018 static void
1019 process_show_tunnel (struct GNUNET_MESH_Handle *h,
1020                      const struct GNUNET_MessageHeader *message)
1021 {
1022   struct GNUNET_MESH_LocalMonitor *msg;
1023   size_t esize;
1024
1025   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Tunnel messasge received\n");
1026
1027   if (NULL == h->tunnel_cb)
1028   {
1029     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1030     return;
1031   }
1032
1033   /* Verify message sanity */
1034   msg = (struct GNUNET_MESH_LocalMonitor *) message;
1035   esize = sizeof (struct GNUNET_MESH_LocalMonitor);
1036   if (ntohs (message->size) != esize)
1037   {
1038     GNUNET_break_op (0);
1039     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1040                 "Show tunnel message: size %hu - expected %u\n",
1041                 ntohs (message->size),
1042                 esize);
1043
1044     h->tunnel_cb (h->tunnel_cls, NULL, NULL);
1045     h->tunnel_cb = NULL;
1046     h->tunnel_cls = NULL;
1047
1048     return;
1049   }
1050
1051   h->tunnel_cb (h->tunnel_cls,
1052                 &msg->destination,
1053                 &msg->owner);
1054 }
1055
1056
1057 /**
1058  * Function to process all messages received from the service
1059  *
1060  * @param cls closure
1061  * @param msg message received, NULL on timeout or fatal error
1062  */
1063 static void
1064 msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
1065 {
1066   struct GNUNET_MESH_Handle *h = cls;
1067   uint16_t type;
1068
1069   if (msg == NULL)
1070   {
1071     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1072          "Mesh service disconnected, reconnecting\n", h);
1073     reconnect (h);
1074     return;
1075   }
1076   type = ntohs (msg->type);
1077   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1078   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
1079        GNUNET_MESH_DEBUG_M2S (type));
1080   switch (type)
1081   {
1082     /* Notify of a new incoming tunnel */
1083   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE:
1084     process_tunnel_created (h, (struct GNUNET_MESH_TunnelMessage *) msg);
1085     break;
1086     /* Notify of a tunnel disconnection */
1087   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY:
1088     process_tunnel_destroy (h, (struct GNUNET_MESH_TunnelMessage *) msg);
1089     break;
1090     /* Notify of a new data packet in the tunnel */
1091   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA:
1092     process_incoming_data (h, msg);
1093     break;
1094   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
1095     process_ack (h, msg);
1096     break;
1097   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS:
1098     process_get_tunnels (h, msg);
1099     break;
1100   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL:
1101     process_show_tunnel (h, msg);
1102     break;
1103   default:
1104     /* We shouldn't get any other packages, log and ignore */
1105     LOG (GNUNET_ERROR_TYPE_WARNING,
1106          "unsolicited message form service (type %s)\n",
1107          GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
1108   }
1109   LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
1110   if (GNUNET_YES == h->in_receive)
1111   {
1112     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1113                            GNUNET_TIME_UNIT_FOREVER_REL);
1114   }
1115   else
1116   {
1117     LOG (GNUNET_ERROR_TYPE_DEBUG,
1118          "in receive off, not calling CLIENT_receive\n");
1119   }
1120 }
1121
1122
1123 /******************************************************************************/
1124 /************************       SEND FUNCTIONS     ****************************/
1125 /******************************************************************************/
1126
1127 /**
1128  * Function called to send a message to the service.
1129  * "buf" will be NULL and "size" zero if the socket was closed for writing in
1130  * the meantime.
1131  *
1132  * @param cls closure, the mesh handle
1133  * @param size number of bytes available in buf
1134  * @param buf where the callee should write the connect message
1135  * @return number of bytes written to buf
1136  */
1137 static size_t
1138 send_callback (void *cls, size_t size, void *buf)
1139 {
1140   struct GNUNET_MESH_Handle *h = cls;
1141   struct GNUNET_MESH_TransmitHandle *th;
1142   struct GNUNET_MESH_TransmitHandle *next;
1143   struct GNUNET_MESH_Tunnel *t;
1144   char *cbuf = buf;
1145   size_t tsize;
1146   size_t psize;
1147   size_t nsize;
1148
1149   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1150   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() Buffer %u\n", size);
1151   if ((0 == size) || (NULL == buf))
1152   {
1153     LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
1154     reconnect (h);
1155     h->th = NULL;
1156     return 0;
1157   }
1158   tsize = 0;
1159   next = h->th_head;
1160   nsize = message_ready_size (h);
1161   while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
1162   {
1163     t = th->tunnel;
1164     if (GNUNET_YES == th_is_payload (th))
1165     {
1166       struct GNUNET_MESH_LocalData *dmsg;
1167       struct GNUNET_MessageHeader *mh;
1168
1169       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload\n");
1170       if (GNUNET_NO == t->allow_send)
1171       {
1172         /* This tunnel is not ready to transmit yet, try next message */
1173         next = th->next;
1174         continue;
1175       }
1176       t->packet_size = 0;
1177       GNUNET_assert (size >= th->size);
1178       dmsg = (struct GNUNET_MESH_LocalData *) cbuf;
1179       mh = (struct GNUNET_MessageHeader *) &dmsg[1];
1180       psize = th->notify (th->notify_cls,
1181                           size - sizeof (struct GNUNET_MESH_LocalData),
1182                           mh);
1183       if (psize > 0)
1184       {
1185         psize += sizeof (struct GNUNET_MESH_LocalData);
1186         GNUNET_assert (size >= psize);
1187         dmsg->header.size = htons (psize);
1188         dmsg->tid = htonl (t->tid);
1189         dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA);
1190         LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload type %s\n",
1191              GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1192         t->allow_send = GNUNET_NO;
1193       }
1194       else
1195       {
1196         LOG (GNUNET_ERROR_TYPE_DEBUG,
1197              "#  callback returned size 0, "
1198              "application canceled transmission\n");
1199       }
1200     }
1201     else
1202     {
1203       struct GNUNET_MessageHeader *mh = (struct GNUNET_MessageHeader *) &th[1];
1204
1205       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  mesh internal traffic, type %s\n",
1206            GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1207       memcpy (cbuf, &th[1], th->size);
1208       psize = th->size;
1209     }
1210     if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1211       GNUNET_SCHEDULER_cancel (th->timeout_task);
1212     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1213     GNUNET_free (th);
1214     next = h->th_head;
1215     nsize = message_ready_size (h);
1216     cbuf += psize;
1217     size -= psize;
1218     tsize += psize;
1219   }
1220   LOG (GNUNET_ERROR_TYPE_DEBUG, "#  total size: %u\n", tsize);
1221   h->th = NULL;
1222   size = message_ready_size (h);
1223   if (0 != size)
1224   {
1225     LOG (GNUNET_ERROR_TYPE_DEBUG, "#  next size: %u\n", size);
1226     h->th =
1227         GNUNET_CLIENT_notify_transmit_ready (h->client, size,
1228                                              GNUNET_TIME_UNIT_FOREVER_REL,
1229                                              GNUNET_YES, &send_callback, h);
1230   }
1231   else
1232   {
1233     if (NULL != h->th_head)
1234       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  can't transmit any more\n");
1235     else
1236       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  nothing left to transmit\n");
1237   }
1238   if (GNUNET_NO == h->in_receive)
1239   {
1240     LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n");
1241     h->in_receive = GNUNET_YES;
1242     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1243                            GNUNET_TIME_UNIT_FOREVER_REL);
1244   }
1245   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() END\n");
1246   return tsize;
1247 }
1248
1249
1250 /**
1251  * Auxiliary function to send an already constructed packet to the service.
1252  * Takes care of creating a new queue element, copying the message and
1253  * calling the tmt_rdy function if necessary.
1254  * 
1255  * @param h mesh handle
1256  * @param msg message to transmit
1257  * @param tunnel tunnel this send is related to (NULL if N/A)
1258  */
1259 static void
1260 send_packet (struct GNUNET_MESH_Handle *h,
1261              const struct GNUNET_MessageHeader *msg,
1262              struct GNUNET_MESH_Tunnel *tunnel)
1263 {
1264   struct GNUNET_MESH_TransmitHandle *th;
1265   size_t msize;
1266
1267   LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
1268        GNUNET_MESH_DEBUG_M2S(ntohs(msg->type)));
1269   msize = ntohs (msg->size);
1270   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle) + msize);
1271   th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1272   th->size = msize;
1273   th->tunnel = tunnel;
1274   memcpy (&th[1], msg, msize);
1275   add_to_queue (h, th);
1276   LOG (GNUNET_ERROR_TYPE_DEBUG, "  queued\n");
1277   if (NULL != h->th)
1278     return;
1279   LOG (GNUNET_ERROR_TYPE_DEBUG, "  calling ntfy tmt rdy for %u bytes\n", msize);
1280   h->th =
1281       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
1282                                            GNUNET_TIME_UNIT_FOREVER_REL,
1283                                            GNUNET_YES, &send_callback, h);
1284 }
1285
1286
1287 /******************************************************************************/
1288 /**********************      API CALL DEFINITIONS     *************************/
1289 /******************************************************************************/
1290
1291 struct GNUNET_MESH_Handle *
1292 GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
1293                      GNUNET_MESH_InboundTunnelNotificationHandler new_tunnel,
1294                      GNUNET_MESH_TunnelEndHandler cleaner,
1295                      const struct GNUNET_MESH_MessageHandler *handlers,
1296                      const uint32_t *ports)
1297 {
1298   struct GNUNET_MESH_Handle *h;
1299
1300   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect()\n");
1301   h = GNUNET_malloc (sizeof (struct GNUNET_MESH_Handle));
1302   LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
1303   h->cfg = cfg;
1304   h->new_tunnel = new_tunnel;
1305   h->cleaner = cleaner;
1306   h->client = GNUNET_CLIENT_connect ("mesh", cfg);
1307   if (h->client == NULL)
1308   {
1309     GNUNET_break (0);
1310     GNUNET_free (h);
1311     return NULL;
1312   }
1313   h->cls = cls;
1314   h->message_handlers = handlers;
1315   h->ports = ports;
1316   h->next_tid = GNUNET_MESH_LOCAL_TUNNEL_ID_CLI;
1317   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1318   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1319
1320   if (NULL != ports && ports[0] != 0 && NULL == new_tunnel)
1321   {
1322     GNUNET_break (0);
1323     LOG (GNUNET_ERROR_TYPE_DEBUG,
1324          "no new tunnel handler given, ports parameter is useless!!\n");
1325   }
1326   if ((NULL == ports || ports[0] == 0) && NULL != new_tunnel)
1327   {
1328     GNUNET_break (0);
1329     LOG (GNUNET_ERROR_TYPE_DEBUG,
1330          "no ports given, new tunnel handler will never be called!!\n");
1331   }
1332   /* count handlers */
1333   for (h->n_handlers = 0;
1334        handlers && handlers[h->n_handlers].type;
1335        h->n_handlers++) ;
1336   for (h->n_ports = 0;
1337        ports && ports[h->n_ports];
1338        h->n_ports++) ;
1339   send_connect (h);
1340   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect() END\n");
1341   return h;
1342 }
1343
1344
1345 void
1346 GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle)
1347 {
1348   struct GNUNET_MESH_Tunnel *t;
1349   struct GNUNET_MESH_Tunnel *aux;
1350   struct GNUNET_MESH_TransmitHandle *th;
1351
1352   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH DISCONNECT\n");
1353
1354 #if DEBUG_ACK
1355   LOG (GNUNET_ERROR_TYPE_INFO, "Sent %d ACKs\n", handle->acks_sent);
1356   LOG (GNUNET_ERROR_TYPE_INFO, "Recv %d ACKs\n\n", handle->acks_recv);
1357 #endif
1358
1359   t = handle->tunnels_head;
1360   while (NULL != t)
1361   {
1362     aux = t->next;
1363     if (t->tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1364     {
1365       GNUNET_break (0);
1366       LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X not destroyed\n", t->tid);
1367     }
1368     destroy_tunnel (t, GNUNET_YES);
1369     t = aux;
1370   }
1371   while ( (th = handle->th_head) != NULL)
1372   {
1373     struct GNUNET_MessageHeader *msg;
1374
1375     /* Make sure it is an allowed packet (everything else should have been
1376      * already canceled).
1377      */
1378     GNUNET_break (GNUNET_NO == th_is_payload (th));
1379     msg = (struct GNUNET_MessageHeader *) &th[1];
1380     switch (ntohs(msg->type))
1381     {
1382       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT:
1383       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY:
1384       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS:
1385       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL:
1386         break;
1387       default:
1388         GNUNET_break (0);
1389         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected msg %u\n",
1390              ntohs(msg->type));
1391     }
1392
1393     GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th);
1394     GNUNET_free (th);
1395   }
1396
1397   if (NULL != handle->th)
1398   {
1399     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1400     handle->th = NULL;
1401   }
1402   if (NULL != handle->client)
1403   {
1404     GNUNET_CLIENT_disconnect (handle->client);
1405     handle->client = NULL;
1406   }
1407   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1408   {
1409     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1410     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1411   }
1412   GNUNET_free (handle);
1413 }
1414
1415
1416 /**
1417  * Create a new tunnel (we're initiator and will be allowed to add/remove peers
1418  * and to broadcast).
1419  *
1420  * @param h mesh handle
1421  * @param tunnel_ctx client's tunnel context to associate with the tunnel
1422  * @param peer peer identity the tunnel should go to
1423  * @param port Port number.
1424  * @param nobuffer Flag for disabling buffering on relay nodes.
1425  * @param reliable Flag for end-to-end reliability.
1426  *
1427  * @return handle to the tunnel
1428  */
1429 struct GNUNET_MESH_Tunnel *
1430 GNUNET_MESH_tunnel_create (struct GNUNET_MESH_Handle *h, 
1431                            void *tunnel_ctx,
1432                            const struct GNUNET_PeerIdentity *peer,
1433                            uint32_t port,
1434                            int nobuffer,
1435                            int reliable)
1436 {
1437   struct GNUNET_MESH_Tunnel *t;
1438   struct GNUNET_MESH_TunnelMessage msg;
1439
1440   LOG (GNUNET_ERROR_TYPE_DEBUG,
1441        "Creating new tunnel to %s:%u\n",
1442        GNUNET_i2s (peer), port);
1443   t = create_tunnel (h, 0);
1444   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", t);
1445   LOG (GNUNET_ERROR_TYPE_DEBUG, "  number %X\n", t->tid);
1446   t->ctx = tunnel_ctx;
1447   t->peer = GNUNET_PEER_intern (peer);
1448   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
1449   msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
1450   msg.tunnel_id = htonl (t->tid);
1451   msg.port = htonl (port);
1452   msg.peer = *peer;
1453   msg.opt = 0;
1454   if (GNUNET_YES == reliable)
1455     msg.opt |= GNUNET_MESH_OPTION_RELIABLE;
1456   if (GNUNET_YES == nobuffer)
1457     msg.opt |= GNUNET_MESH_OPTION_NOBUFFER;
1458   msg.opt = htonl (msg.opt);
1459   t->allow_send = 0;
1460   send_packet (h, &msg.header, t);
1461   return t;
1462 }
1463
1464
1465 void
1466 GNUNET_MESH_tunnel_destroy (struct GNUNET_MESH_Tunnel *tunnel)
1467 {
1468   struct GNUNET_MESH_Handle *h;
1469   struct GNUNET_MESH_TunnelMessage msg;
1470   struct GNUNET_MESH_TransmitHandle *th;
1471
1472   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying tunnel\n");
1473   h = tunnel->mesh;
1474
1475   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY);
1476   msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
1477   msg.tunnel_id = htonl (tunnel->tid);
1478   memset (&msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
1479   msg.port = 0;
1480   msg.opt = 0;
1481   th = h->th_head;
1482   while (th != NULL)
1483   {
1484     struct GNUNET_MESH_TransmitHandle *aux;
1485     if (th->tunnel == tunnel)
1486     {
1487       aux = th->next;
1488       /* FIXME call the handler? */
1489       if (GNUNET_YES == th_is_payload (th))
1490         th->notify (th->notify_cls, 0, NULL);
1491       GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1492       GNUNET_free (th);
1493       th = aux;
1494     }
1495     else
1496       th = th->next;
1497   }
1498
1499   destroy_tunnel (tunnel, GNUNET_YES);
1500   send_packet (h, &msg.header, NULL);
1501 }
1502
1503
1504 /**
1505  * Get information about a tunnel.
1506  *
1507  * @param tunnel Tunnel handle.
1508  * @param option Query (GNUNET_MESH_OPTION_*).
1509  * @param ... dependant on option, currently not used
1510  *
1511  * @return Union with an answer to the query.
1512  */
1513 const union MeshTunnelInfo *
1514 GNUNET_MESH_tunnel_get_info (struct GNUNET_MESH_Tunnel *tunnel,
1515                              enum MeshTunnelOption option, ...)
1516 {
1517   const union MeshTunnelInfo *ret;
1518
1519   switch (option)
1520   {
1521     case GNUNET_MESH_OPTION_NOBUFFER:
1522       ret = (const union MeshTunnelInfo *) &tunnel->nobuffer;
1523       break;
1524     case GNUNET_MESH_OPTION_RELIABLE:
1525       ret = (const union MeshTunnelInfo *) &tunnel->reliable;
1526       break;
1527     case GNUNET_MESH_OPTION_OOORDER:
1528       ret = (const union MeshTunnelInfo *) &tunnel->ooorder;
1529       break;
1530     case GNUNET_MESH_OPTION_PEER:
1531       ret = (const union MeshTunnelInfo *) &tunnel->peer;
1532       break;
1533     default:
1534       GNUNET_break (0);
1535       return NULL;
1536   }
1537
1538   return ret;
1539 }
1540
1541 struct GNUNET_MESH_TransmitHandle *
1542 GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork,
1543                                    struct GNUNET_TIME_Relative maxdelay,
1544                                    size_t notify_size,
1545                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1546                                    void *notify_cls)
1547 {
1548   struct GNUNET_MESH_TransmitHandle *th;
1549
1550   GNUNET_assert (NULL != tunnel);
1551   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY\n");
1552   LOG (GNUNET_ERROR_TYPE_DEBUG, "    on tunnel %X\n", tunnel->tid);
1553   LOG (GNUNET_ERROR_TYPE_DEBUG, "    allow_send %d\n", tunnel->allow_send);
1554   if (tunnel->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1555     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to origin\n");
1556   else
1557     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to destination\n");
1558   LOG (GNUNET_ERROR_TYPE_DEBUG, "    payload size %u\n", notify_size);
1559   GNUNET_assert (NULL != notify);
1560   GNUNET_assert (0 == tunnel->packet_size); // Only one data packet allowed
1561   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle));
1562   th->tunnel = tunnel;
1563   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1564   th->size = notify_size + sizeof (struct GNUNET_MESH_LocalData);
1565   tunnel->packet_size = th->size;
1566   LOG (GNUNET_ERROR_TYPE_DEBUG, "    total size %u\n", th->size);
1567   th->notify = notify;
1568   th->notify_cls = notify_cls;
1569   add_to_queue (tunnel->mesh, th);
1570   if (NULL != tunnel->mesh->th)
1571     return th;
1572   if (GNUNET_NO == tunnel->allow_send)
1573     return th;
1574   LOG (GNUNET_ERROR_TYPE_DEBUG, "    call client notify tmt rdy\n");
1575   tunnel->mesh->th =
1576       GNUNET_CLIENT_notify_transmit_ready (tunnel->mesh->client, th->size,
1577                                            GNUNET_TIME_UNIT_FOREVER_REL,
1578                                            GNUNET_YES, &send_callback,
1579                                            tunnel->mesh);
1580   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY END\n");
1581   return th;
1582 }
1583
1584
1585 void
1586 GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th)
1587 {
1588   struct GNUNET_MESH_Handle *mesh;
1589
1590   th->tunnel->packet_size = 0;
1591   mesh = th->tunnel->mesh;
1592   if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1593     GNUNET_SCHEDULER_cancel (th->timeout_task);
1594   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
1595   GNUNET_free (th);
1596   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
1597   {
1598     /* queue empty, no point in asking for transmission */
1599     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
1600     mesh->th = NULL;
1601   }
1602 }
1603
1604 void
1605 GNUNET_MESH_receive_done (struct GNUNET_MESH_Tunnel *tunnel)
1606 {
1607   send_ack (tunnel);
1608 }
1609
1610
1611 /**
1612  * Request information about the running mesh peer.
1613  * The callback will be called for every tunnel known to the service,
1614  * listing all active peers that blong to the tunnel.
1615  *
1616  * If called again on the same handle, it will overwrite the previous
1617  * callback and cls. To retrieve the cls, monitor_cancel must be
1618  * called first.
1619  *
1620  * WARNING: unstable API, likely to change in the future!
1621  *
1622  * @param h Handle to the mesh peer.
1623  * @param callback Function to call with the requested data.
1624  * @param callback_cls Closure for @c callback.
1625  */
1626 void
1627 GNUNET_MESH_get_tunnels (struct GNUNET_MESH_Handle *h,
1628                          GNUNET_MESH_TunnelsCB callback,
1629                          void *callback_cls)
1630 {
1631   struct GNUNET_MessageHeader msg;
1632
1633   msg.size = htons (sizeof (msg));
1634   msg.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS);
1635   send_packet (h, &msg, NULL);
1636   h->tunnels_cb = callback;
1637   h->tunnels_cls = callback_cls;
1638
1639   return;
1640 }
1641
1642
1643 /**
1644  * Cancel a monitor request. The monitor callback will not be called.
1645  *
1646  * @param h Mesh handle.
1647  *
1648  * @return Closure given to GNUNET_MESH_monitor, if any.
1649  */
1650 void *
1651 GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h)
1652 {
1653   void *cls;
1654
1655   cls = h->tunnels_cls;
1656   h->tunnels_cb = NULL;
1657   h->tunnels_cls = NULL;
1658   return cls;
1659 }
1660
1661
1662 /**
1663  * Request information about a specific tunnel of the running mesh peer.
1664  *
1665  * WARNING: unstable API, likely to change in the future!
1666  * FIXME Add destination option.
1667  *
1668  * @param h Handle to the mesh peer.
1669  * @param initiator ID of the owner of the tunnel.
1670  * @param tunnel_number Tunnel number.
1671  * @param callback Function to call with the requested data.
1672  * @param callback_cls Closure for @c callback.
1673  */
1674 void
1675 GNUNET_MESH_show_tunnel (struct GNUNET_MESH_Handle *h,
1676                          struct GNUNET_PeerIdentity *initiator,
1677                          unsigned int tunnel_number,
1678                          GNUNET_MESH_TunnelCB callback,
1679                          void *callback_cls)
1680 {
1681   struct GNUNET_MESH_LocalMonitor msg;
1682
1683   msg.header.size = htons (sizeof (msg));
1684   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL);
1685   msg.owner = *initiator;
1686   msg.tunnel_id = htonl (tunnel_number);
1687   msg.reserved = 0;
1688   send_packet (h, &msg.header, NULL);
1689   h->tunnel_cb = callback;
1690   h->tunnel_cls = callback_cls;
1691
1692   return;
1693 }
1694
1695
1696 /**
1697  * Function called to notify a client about the connection
1698  * begin ready to queue more data.  "buf" will be
1699  * NULL and "size" zero if the connection was closed for
1700  * writing in the meantime.
1701  *
1702  * @param cls closure
1703  * @param size number of bytes available in buf
1704  * @param buf where the callee should write the message
1705  * @return number of bytes written to buf
1706  */
1707 static size_t
1708 mesh_mq_ntr (void *cls, size_t size,
1709              void *buf)
1710 {
1711   struct GNUNET_MQ_Handle *mq = cls; 
1712   struct MeshMQState *state = GNUNET_MQ_impl_state (mq);
1713   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
1714   uint16_t msize;
1715
1716   state->th = NULL;
1717   if (NULL == buf)
1718   {
1719     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
1720     return 0;
1721   }
1722   msize = ntohs (msg->size);
1723   GNUNET_assert (msize <= size);
1724   memcpy (buf, msg, msize);
1725   GNUNET_MQ_impl_send_continue (mq);
1726   return msize;
1727 }
1728
1729
1730 /**
1731  * Signature of functions implementing the
1732  * sending functionality of a message queue.
1733  *
1734  * @param mq the message queue
1735  * @param msg the message to send
1736  * @param impl_state state of the implementation
1737  */
1738 static void
1739 mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq,
1740                    const struct GNUNET_MessageHeader *msg, void *impl_state)
1741 {
1742   struct MeshMQState *state = impl_state;
1743
1744   GNUNET_assert (NULL == state->th);
1745   GNUNET_MQ_impl_send_commit (mq);
1746   state->th =
1747       GNUNET_MESH_notify_transmit_ready (state->tunnel,
1748                                          /* FIXME: add option for corking */
1749                                          GNUNET_NO,
1750                                          GNUNET_TIME_UNIT_FOREVER_REL, 
1751                                          ntohs (msg->size),
1752                                          mesh_mq_ntr, mq);
1753
1754 }
1755
1756
1757 /**
1758  * Signature of functions implementing the
1759  * destruction of a message queue.
1760  * Implementations must not free 'mq', but should
1761  * take care of 'impl_state'.
1762  * 
1763  * @param mq the message queue to destroy
1764  * @param impl_state state of the implementation
1765  */
1766 static void
1767 mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
1768 {
1769   struct MeshMQState *state = impl_state;
1770
1771   if (NULL != state->th)
1772     GNUNET_MESH_notify_transmit_ready_cancel (state->th);
1773
1774   GNUNET_free (state);
1775 }
1776
1777
1778 /**
1779  * Create a message queue for a mesh tunnel.
1780  * The message queue can only be used to transmit messages,
1781  * not to receive them.
1782  *
1783  * @param tunnel the tunnel to create the message qeue for
1784  * @return a message queue to messages over the tunnel
1785  */
1786 struct GNUNET_MQ_Handle *
1787 GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel)
1788 {
1789   struct GNUNET_MQ_Handle *mq;
1790   struct MeshMQState *state;
1791
1792   state = GNUNET_new (struct MeshMQState);
1793   state->tunnel = tunnel;
1794
1795   mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl,
1796                                       mesh_mq_destroy_impl,
1797                                       NULL, /* FIXME: cancel impl. */
1798                                       state,
1799                                       NULL, /* no msg handlers */
1800                                       NULL, /* no err handlers */
1801                                       NULL); /* no handler cls */
1802   return mq;
1803 }
1804