- extra wait time
[oweals/gnunet.git] / src / mesh / mesh_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4      GNUnet is free software; you can redistribute it and/or modify
5      it under the terms of the GNU General Public License as published
6      by the Free Software Foundation; either version 3, or (at your
7      option) any later version.
8      GNUnet is distributed in the hope that it will be useful, but
9      WITHOUT ANY WARRANTY; without even the implied warranty of
10      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11      General Public License for more details.
12      You should have received a copy of the GNU General Public License
13      along with GNUnet; see the file COPYING.  If not, write to the
14      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
15      Boston, MA 02111-1307, USA.
16 */
17
18 /**
19  * @file mesh/mesh_api.c
20  * @brief mesh api: client implementation of new mesh service
21  * @author Bartlomiej Polot
22  *
23  * STRUCTURE:
24  * - DATA STRUCTURES
25  * - DECLARATIONS
26  * - AUXILIARY FUNCTIONS
27  * - RECEIVE HANDLERS
28  * - SEND FUNCTIONS
29  * - API CALL DEFINITIONS
30  *
31  * TODO: add regex to reconnect
32  */
33 #include "platform.h"
34 #include "gnunet_common.h"
35 #include "gnunet_client_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_peer_lib.h"
38 #include "gnunet_mesh_service.h"
39 #include "mesh.h"
40 #include "mesh_protocol.h"
41
42 #define LOG(kind,...) GNUNET_log_from (kind, "mesh-api",__VA_ARGS__)
43 #define DEBUG_ACK GNUNET_YES
44
45 /******************************************************************************/
46 /************************      DATA STRUCTURES     ****************************/
47 /******************************************************************************/
48
49 /**
50  * Transmission queue to the service
51  */
52 struct GNUNET_MESH_TransmitHandle
53 {
54
55     /**
56      * Double Linked list
57      */
58   struct GNUNET_MESH_TransmitHandle *next;
59
60     /**
61      * Double Linked list
62      */
63   struct GNUNET_MESH_TransmitHandle *prev;
64
65     /**
66      * Tunnel this message is sent on / for (may be NULL for control messages).
67      */
68   struct GNUNET_MESH_Tunnel *tunnel;
69
70     /**
71      * Callback to obtain the message to transmit, or NULL if we
72      * got the message in 'data'.  Notice that messages built
73      * by 'notify' need to be encapsulated with information about
74      * the 'target'.
75      */
76   GNUNET_CONNECTION_TransmitReadyNotify notify;
77
78     /**
79      * Closure for 'notify'
80      */
81   void *notify_cls;
82
83     /**
84      * How long is this message valid.  Once the timeout has been
85      * reached, the message must no longer be sent.  If this
86      * is a message with a 'notify' callback set, the 'notify'
87      * function should be called with 'buf' NULL and size 0.
88      */
89   struct GNUNET_TIME_Absolute timeout;
90
91     /**
92      * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
93      */
94   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
95
96     /**
97      * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
98      */
99   size_t size;
100 };
101
102
103 /**
104  * Opaque handle to the service.
105  */
106 struct GNUNET_MESH_Handle
107 {
108
109     /**
110      * Handle to the server connection, to send messages later
111      */
112   struct GNUNET_CLIENT_Connection *client;
113
114     /**
115      * Set of handlers used for processing incoming messages in the tunnels
116      */
117   const struct GNUNET_MESH_MessageHandler *message_handlers;
118
119   /**
120    * Number of handlers in the handlers array.
121    */
122   unsigned int n_handlers;
123
124   /**
125    * Ports open.
126    */
127   const uint32_t *ports;
128
129   /**
130    * Number of ports.
131    */
132   unsigned int n_ports;
133
134     /**
135      * Double linked list of the tunnels this client is connected to, head.
136      */
137   struct GNUNET_MESH_Tunnel *tunnels_head;
138
139     /**
140      * Double linked list of the tunnels this client is connected to, tail.
141      */
142   struct GNUNET_MESH_Tunnel *tunnels_tail;
143
144     /**
145      * Callback for inbound tunnel creation
146      */
147   GNUNET_MESH_InboundTunnelNotificationHandler *new_tunnel;
148
149     /**
150      * Callback for inbound tunnel disconnection
151      */
152   GNUNET_MESH_TunnelEndHandler *cleaner;
153
154     /**
155      * Handle to cancel pending transmissions in case of disconnection
156      */
157   struct GNUNET_CLIENT_TransmitHandle *th;
158
159     /**
160      * Closure for all the handlers given by the client
161      */
162   void *cls;
163
164     /**
165      * Messages to send to the service, head.
166      */
167   struct GNUNET_MESH_TransmitHandle *th_head;
168
169     /**
170      * Messages to send to the service, tail.
171      */
172   struct GNUNET_MESH_TransmitHandle *th_tail;
173
174     /**
175      * tid of the next tunnel to create (to avoid reusing IDs often)
176      */
177   MESH_TunnelNumber next_tid;
178
179     /**
180      * Have we started the task to receive messages from the service
181      * yet? We do this after we send the 'MESH_LOCAL_CONNECT' message.
182      */
183   int in_receive;
184
185   /**
186    * Configuration given by the client, in case of reconnection
187    */
188   const struct GNUNET_CONFIGURATION_Handle *cfg;
189
190   /**
191    * Time to the next reconnect in case one reconnect fails
192    */
193   struct GNUNET_TIME_Relative reconnect_time;
194   
195   /**
196    * Task for trying to reconnect.
197    */
198   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
199
200   /**
201    * Monitor callback
202    */
203   GNUNET_MESH_TunnelsCB tunnels_cb;
204
205   /**
206    * Monitor callback closure.
207    */
208   void *tunnels_cls;
209
210   /**
211    * Tunnel callback.
212    */
213   GNUNET_MESH_TunnelCB tunnel_cb;
214
215   /**
216    * Tunnel callback closure.
217    */
218   void *tunnel_cls;
219
220 #if DEBUG_ACK
221   unsigned int acks_sent;
222   unsigned int acks_recv;
223 #endif
224 };
225
226
227 /**
228  * Description of a peer
229  */
230 struct GNUNET_MESH_Peer
231 {
232   /**
233    * ID of the peer in short form
234    */
235   GNUNET_PEER_Id id;
236
237   /**
238    * Tunnel this peer belongs to
239    */
240   struct GNUNET_MESH_Tunnel *t;
241
242   /**
243    * Flag indicating whether service has informed about its connection
244    * FIXME-BART: is this flag used? Seems dead right now...
245    */
246   int connected;
247
248 };
249
250
251 /**
252  * Opaque handle to a tunnel.
253  */
254 struct GNUNET_MESH_Tunnel
255 {
256
257     /**
258      * DLL next
259      */
260   struct GNUNET_MESH_Tunnel *next;
261
262     /**
263      * DLL prev
264      */
265   struct GNUNET_MESH_Tunnel *prev;
266
267     /**
268      * Handle to the mesh this tunnel belongs to
269      */
270   struct GNUNET_MESH_Handle *mesh;
271
272     /**
273      * Local ID of the tunnel
274      */
275   MESH_TunnelNumber tid;
276
277     /**
278      * Port number.
279      */
280   uint32_t port;
281
282     /**
283      * Other end of the tunnel.
284      */
285   GNUNET_PEER_Id peer;
286
287   /**
288    * Any data the caller wants to put in here
289    */
290   void *ctx;
291
292     /**
293      * Size of packet queued in this tunnel
294      */
295   unsigned int packet_size;
296
297     /**
298      * Is the tunnel allowed to buffer?
299      */
300   int nobuffer;
301
302     /**
303      * Is the tunnel realiable?
304      */
305   int reliable;
306
307     /**
308      * If reliable, is the tunnel out of order?
309      */
310   int ooorder;
311
312     /**
313      * Are we allowed to send to the service?
314      */
315   int allow_send;
316
317 };
318
319
320 /**
321  * Implementation state for mesh's message queue.
322  */
323 struct MeshMQState
324 {
325   /**
326    * The current transmit handle, or NULL
327    * if no transmit is active.
328    */
329   struct GNUNET_MESH_TransmitHandle *th;
330
331   /**
332    * Tunnel to send the data over.
333    */
334   struct GNUNET_MESH_Tunnel *tunnel;
335 };
336
337
338 /******************************************************************************/
339 /***********************         DECLARATIONS         *************************/
340 /******************************************************************************/
341
342 /**
343  * Function called to send a message to the service.
344  * "buf" will be NULL and "size" zero if the socket was closed for writing in
345  * the meantime.
346  *
347  * @param cls closure, the mesh handle
348  * @param size number of bytes available in buf
349  * @param buf where the callee should write the connect message
350  * @return number of bytes written to buf
351  */
352 static size_t
353 send_callback (void *cls, size_t size, void *buf);
354
355
356 /******************************************************************************/
357 /***********************     AUXILIARY FUNCTIONS      *************************/
358 /******************************************************************************/
359
360 /**
361  * Check if transmission is a payload packet.
362  *
363  * @param th Transmission handle.
364  *
365  * @return GNUNET_YES if it is a payload packet,
366  *         GNUNET_NO if it is a mesh management packet.
367  */
368 static int
369 th_is_payload (struct GNUNET_MESH_TransmitHandle *th)
370 {
371   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
372 }
373
374
375 /**
376  * Check whether there is any message ready in the queue and find the size.
377  * 
378  * @param h Mesh handle.
379  * 
380  * @return The size of the first ready message in the queue,
381  *         0 if there is none.
382  */
383 static size_t
384 message_ready_size (struct GNUNET_MESH_Handle *h)
385 {
386   struct GNUNET_MESH_TransmitHandle *th;
387   struct GNUNET_MESH_Tunnel *t;
388
389   for (th = h->th_head; NULL != th; th = th->next)
390   {
391     t = th->tunnel;
392     if (GNUNET_NO == th_is_payload (th))
393     {
394       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message internal\n");
395       return th->size;
396     }
397     if (GNUNET_YES == t->allow_send)
398     {
399       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message payload ok\n");
400       return th->size;
401     }
402   }
403   return 0;
404 }
405
406
407 /**
408  * Get the tunnel handler for the tunnel specified by id from the given handle
409  * @param h Mesh handle
410  * @param tid ID of the wanted tunnel
411  * @return handle to the required tunnel or NULL if not found
412  */
413 static struct GNUNET_MESH_Tunnel *
414 retrieve_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
415 {
416   struct GNUNET_MESH_Tunnel *t;
417
418   t = h->tunnels_head;
419   while (t != NULL)
420   {
421     if (t->tid == tid)
422       return t;
423     t = t->next;
424   }
425   return NULL;
426 }
427
428
429 /**
430  * Create a new tunnel and insert it in the tunnel list of the mesh handle
431  * @param h Mesh handle
432  * @param tid desired tid of the tunnel, 0 to assign one automatically
433  * @return handle to the created tunnel
434  */
435 static struct GNUNET_MESH_Tunnel *
436 create_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
437 {
438   struct GNUNET_MESH_Tunnel *t;
439
440   t = GNUNET_malloc (sizeof (struct GNUNET_MESH_Tunnel));
441   GNUNET_CONTAINER_DLL_insert (h->tunnels_head, h->tunnels_tail, t);
442   t->mesh = h;
443   if (0 == tid)
444   {
445     t->tid = h->next_tid;
446     while (NULL != retrieve_tunnel (h, h->next_tid))
447     {
448       h->next_tid++;
449       h->next_tid &= ~GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
450       h->next_tid |= GNUNET_MESH_LOCAL_TUNNEL_ID_CLI;
451     }
452   }
453   else
454   {
455     t->tid = tid;
456   }
457   t->allow_send = GNUNET_NO;
458   t->nobuffer = GNUNET_NO;
459   return t;
460 }
461
462
463 /**
464  * Destroy the specified tunnel.
465  * - Destroys all peers, calling the disconnect callback on each if needed
466  * - Cancels all outgoing traffic for that tunnel, calling respective notifys
467  * - Calls cleaner if tunnel was inbound
468  * - Frees all memory used
469  *
470  * @param t Pointer to the tunnel.
471  * @param call_cleaner Whether to call the cleaner handler.
472  *
473  * @return Handle to the required tunnel or NULL if not found.
474  */
475 static void
476 destroy_tunnel (struct GNUNET_MESH_Tunnel *t, int call_cleaner)
477 {
478   struct GNUNET_MESH_Handle *h;
479   struct GNUNET_MESH_TransmitHandle *th;
480   struct GNUNET_MESH_TransmitHandle *next;
481
482   if (NULL == t)
483   {
484     GNUNET_break (0);
485     return;
486   }
487   LOG (GNUNET_ERROR_TYPE_DEBUG, 
488        "destroy_tunnel %X\n", 
489        t->tid);
490   h = t->mesh;
491
492   GNUNET_CONTAINER_DLL_remove (h->tunnels_head, h->tunnels_tail, t);
493
494   /* signal tunnel destruction */
495   if ( (NULL != h->cleaner) && (0 != t->peer) && (GNUNET_YES == call_cleaner) )
496     h->cleaner (h->cls, t, t->ctx);
497
498   /* check that clients did not leave messages behind in the queue */
499   for (th = h->th_head; NULL != th; th = next)
500   {
501     next = th->next;
502     if (th->tunnel != t)
503       continue;
504     /* Clients should have aborted their requests already.
505      * Management traffic should be ok, as clients can't cancel that */
506     GNUNET_break (GNUNET_NO == th_is_payload(th));
507     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
508
509     /* clean up request */
510     if (GNUNET_SCHEDULER_NO_TASK != th->timeout_task)
511       GNUNET_SCHEDULER_cancel (th->timeout_task);
512     GNUNET_free (th);    
513   }
514
515   /* if there are no more pending requests with mesh service, cancel active request */
516   /* Note: this should be unnecessary... */
517   if ((0 == message_ready_size (h)) && (NULL != h->th))
518   {
519     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
520     h->th = NULL;
521   }
522
523   if (0 != t->peer)
524     GNUNET_PEER_change_rc (t->peer, -1);
525   GNUNET_free (t);
526   return;
527 }
528
529
530 /**
531  * Notify client that the transmission has timed out
532  * 
533  * @param cls closure
534  * @param tc task context
535  */
536 static void
537 timeout_transmission (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
538 {
539   struct GNUNET_MESH_TransmitHandle *th = cls;
540   struct GNUNET_MESH_Handle *mesh;
541
542   mesh = th->tunnel->mesh;
543   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
544   th->tunnel->packet_size = 0;
545   if (GNUNET_YES == th_is_payload (th))
546     th->notify (th->notify_cls, 0, NULL);
547   GNUNET_free (th);
548   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
549   {
550     /* nothing ready to transmit, no point in asking for transmission */
551     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
552     mesh->th = NULL;
553   }
554 }
555
556
557 /**
558  * Add a transmit handle to the transmission queue and set the
559  * timeout if needed.
560  *
561  * @param h mesh handle with the queue head and tail
562  * @param th handle to the packet to be transmitted
563  */
564 static void
565 add_to_queue (struct GNUNET_MESH_Handle *h,
566               struct GNUNET_MESH_TransmitHandle *th)
567 {
568   GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
569   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us == th->timeout.abs_value_us)
570     return;
571   th->timeout_task =
572       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
573                                     (th->timeout), &timeout_transmission, th);
574 }
575
576
577 /**
578  * Auxiliary function to send an already constructed packet to the service.
579  * Takes care of creating a new queue element, copying the message and
580  * calling the tmt_rdy function if necessary.
581  *
582  * @param h mesh handle
583  * @param msg message to transmit
584  * @param tunnel tunnel this send is related to (NULL if N/A)
585  */
586 static void
587 send_packet (struct GNUNET_MESH_Handle *h,
588              const struct GNUNET_MessageHeader *msg,
589              struct GNUNET_MESH_Tunnel *tunnel);
590
591
592 /**
593  * Send an ack on the tunnel to confirm the processing of a message.
594  * 
595  * @param t Tunnel on which to send the ACK.
596  */
597 static void
598 send_ack (struct GNUNET_MESH_Tunnel *t)
599 {
600   struct GNUNET_MESH_LocalAck msg;
601
602   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on tunnel %X\n", t->tid);
603   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
604   msg.header.size = htons (sizeof (msg));
605   msg.tunnel_id = htonl (t->tid);
606
607 #if DEBUG_ACK
608   t->mesh->acks_sent++;
609 #endif
610
611   send_packet (t->mesh, &msg.header, t);
612   return;
613 }
614
615
616
617 /**
618  * Reconnect callback: tries to reconnect again after a failer previous
619  * reconnecttion
620  * @param cls closure (mesh handle)
621  * @param tc task context
622  */
623 static void
624 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
625
626
627 /**
628  * Send a connect packet to the service with the applications and types
629  * requested by the user.
630  *
631  * @param h The mesh handle.
632  *
633  */
634 static void
635 send_connect (struct GNUNET_MESH_Handle *h)
636 {
637   size_t size;
638
639   size = sizeof (struct GNUNET_MESH_ClientConnect);
640   size += h->n_ports * sizeof (uint32_t);
641   {
642     char buf[size] GNUNET_ALIGN;
643     struct GNUNET_MESH_ClientConnect *msg;
644     uint32_t *ports;
645     uint16_t i;
646
647     /* build connection packet */
648     msg = (struct GNUNET_MESH_ClientConnect *) buf;
649     msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
650     msg->header.size = htons (size);
651     ports = (uint32_t *) &msg[1];
652     for (i = 0; i < h->n_ports; i++)
653     {
654       ports[i] = htonl (h->ports[i]);
655       LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n",
656            h->ports[i]);
657     }
658     LOG (GNUNET_ERROR_TYPE_DEBUG,
659          "Sending %lu bytes long message with %u ports\n",
660          ntohs (msg->header.size), h->n_ports);
661     send_packet (h, &msg->header, NULL);
662   }
663 }
664
665
666 /**
667  * Reconnect to the service, retransmit all infomation to try to restore the
668  * original state.
669  *
670  * @param h handle to the mesh
671  *
672  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
673  */
674 static int
675 do_reconnect (struct GNUNET_MESH_Handle *h)
676 {
677   struct GNUNET_MESH_Tunnel *t;
678
679   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
680   LOG (GNUNET_ERROR_TYPE_DEBUG, "*******   RECONNECT   *******\n");
681   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
682   LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
683   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
684
685   /* disconnect */
686   if (NULL != h->th)
687   {
688     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
689     h->th = NULL;
690   }
691   if (NULL != h->client)
692   {
693     GNUNET_CLIENT_disconnect (h->client);
694   }
695
696   /* connect again */
697   h->client = GNUNET_CLIENT_connect ("mesh", h->cfg);
698   if (h->client == NULL)
699   {
700     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
701                                                       &reconnect_cbk, h);
702     h->reconnect_time =
703         GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
704                                   GNUNET_TIME_relative_multiply
705                                   (h->reconnect_time, 2));
706     LOG (GNUNET_ERROR_TYPE_DEBUG, 
707          "Next retry in %s\n",
708          GNUNET_STRINGS_relative_time_to_string (h->reconnect_time,
709                                                  GNUNET_NO));
710     GNUNET_break (0);
711     return GNUNET_NO;
712   }
713   else
714   {
715     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
716   }
717   send_connect (h);
718   /* Rebuild all tunnels */
719   for (t = h->tunnels_head; NULL != t; t = t->next)
720   {
721     struct GNUNET_MESH_TunnelMessage tmsg;
722     uint32_t options;
723
724     if (t->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
725     {
726       /* Tunnel was created by service (incoming tunnel) */
727       /* TODO: Notify service of missing tunnel, to request
728        * creator to recreate path (find a path to him via DHT?)
729        */
730       continue;
731     }
732     t->allow_send = GNUNET_NO;
733     tmsg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
734     tmsg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
735     tmsg.tunnel_id = htonl (t->tid);
736     tmsg.port = htonl (t->port);
737     GNUNET_PEER_resolve (t->peer, &tmsg.peer);
738
739     options = 0;
740     if (GNUNET_YES == t->nobuffer)
741       options |= GNUNET_MESH_OPTION_NOBUFFER;
742
743     if (GNUNET_YES == t->reliable)
744       options |= GNUNET_MESH_OPTION_RELIABLE;
745
746     tmsg.opt = htonl (options);
747     send_packet (h, &tmsg.header, t);
748   }
749   return GNUNET_YES;
750 }
751
752 /**
753  * Reconnect callback: tries to reconnect again after a failer previous
754  * reconnecttion
755  * @param cls closure (mesh handle)
756  * @param tc task context
757  */
758 static void
759 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
760 {
761   struct GNUNET_MESH_Handle *h = cls;
762
763   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
764   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
765     return;
766   do_reconnect (h);
767 }
768
769
770 /**
771  * Reconnect to the service, retransmit all infomation to try to restore the
772  * original state.
773  *
774  * @param h handle to the mesh
775  *
776  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
777  */
778 static void
779 reconnect (struct GNUNET_MESH_Handle *h)
780 {
781   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requested RECONNECT\n");
782   h->in_receive = GNUNET_NO;
783   if (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task)
784     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
785                                                       &reconnect_cbk, h);
786 }
787
788
789 /******************************************************************************/
790 /***********************      RECEIVE HANDLERS     ****************************/
791 /******************************************************************************/
792
793 /**
794  * Process the new tunnel notification and add it to the tunnels in the handle
795  *
796  * @param h     The mesh handle
797  * @param msg   A message with the details of the new incoming tunnel
798  */
799 static void
800 process_tunnel_created (struct GNUNET_MESH_Handle *h,
801                         const struct GNUNET_MESH_TunnelMessage *msg)
802 {
803   struct GNUNET_MESH_Tunnel *t;
804   MESH_TunnelNumber tid;
805   uint32_t port;
806
807   tid = ntohl (msg->tunnel_id);
808   port = ntohl (msg->port);
809   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming tunnel %X:%u\n", tid, port);
810   if (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
811   {
812     GNUNET_break (0);
813     return;
814   }
815   if (NULL != h->new_tunnel)
816   {
817     t = create_tunnel (h, tid);
818     t->allow_send = GNUNET_NO;
819     t->peer = GNUNET_PEER_intern (&msg->peer);
820     t->mesh = h;
821     t->tid = tid;
822     t->port = port;
823     if (0 != (msg->opt & GNUNET_MESH_OPTION_NOBUFFER))
824       t->nobuffer = GNUNET_YES;
825     else
826       t->nobuffer = GNUNET_NO;
827     if (0 != (msg->opt & GNUNET_MESH_OPTION_RELIABLE))
828       t->reliable = GNUNET_YES;
829     else
830       t->reliable = GNUNET_NO;
831     if (GNUNET_YES == t->reliable &&
832         0 != (msg->opt & GNUNET_MESH_OPTION_OOORDER))
833       t->ooorder = GNUNET_YES;
834     else
835       t->ooorder = GNUNET_NO;
836     LOG (GNUNET_ERROR_TYPE_DEBUG, "  created tunnel %p\n", t);
837     t->ctx = h->new_tunnel (h->cls, t, &msg->peer, t->port);
838     LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
839   }
840   else
841   {
842     struct GNUNET_MESH_TunnelMessage d_msg;
843
844     LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming tunnels\n");
845
846     d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY);
847     d_msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
848     d_msg.tunnel_id = msg->tunnel_id;
849     memset (&d_msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
850     d_msg.port = 0;
851     d_msg.opt = 0;
852
853     send_packet (h, &d_msg.header, NULL);
854   }
855   return;
856 }
857
858
859 /**
860  * Process the tunnel destroy notification and free associated resources
861  *
862  * @param h     The mesh handle
863  * @param msg   A message with the details of the tunnel being destroyed
864  */
865 static void
866 process_tunnel_destroy (struct GNUNET_MESH_Handle *h,
867                         const struct GNUNET_MESH_TunnelMessage *msg)
868 {
869   struct GNUNET_MESH_Tunnel *t;
870   MESH_TunnelNumber tid;
871
872   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying tunnel from service\n");
873   tid = ntohl (msg->tunnel_id);
874   t = retrieve_tunnel (h, tid);
875
876   if (NULL == t)
877   {
878     LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X unknown\n", tid);
879     return;
880   }
881   LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X destroyed\n", t->tid);
882   destroy_tunnel (t, GNUNET_YES);
883 }
884
885
886 /**
887  * Process the incoming data packets, call appropriate handlers.
888  *
889  * @param h         The mesh handle
890  * @param message   A message encapsulating the data
891  */
892 static void
893 process_incoming_data (struct GNUNET_MESH_Handle *h,
894                        const struct GNUNET_MessageHeader *message)
895 {
896   const struct GNUNET_MessageHeader *payload;
897   const struct GNUNET_MESH_MessageHandler *handler;
898   struct GNUNET_MESH_LocalData *dmsg;
899   struct GNUNET_MESH_Tunnel *t;
900   unsigned int i;
901   uint16_t type;
902
903   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n");
904
905   dmsg = (struct GNUNET_MESH_LocalData *) message;
906
907   t = retrieve_tunnel (h, ntohl (dmsg->tid));
908   payload = (struct GNUNET_MessageHeader *) &dmsg[1];
909   if (NULL == t)
910   {
911     /* Tunnel was ignored/destroyed, probably service didn't get it yet */
912     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ignored!\n");
913     return;
914   }
915   LOG (GNUNET_ERROR_TYPE_DEBUG,
916        "  %s data on tunnel %s [%X]\n",
917        t->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV ? "fwd" : "bck",
918        GNUNET_i2s (GNUNET_PEER_resolve2(t->peer)), 
919        ntohl (dmsg->tid));
920   type = ntohs (payload->type);
921   LOG (GNUNET_ERROR_TYPE_DEBUG, "  payload type %u\n", type);
922   for (i = 0; i < h->n_handlers; i++)
923   {
924     handler = &h->message_handlers[i];
925     LOG (GNUNET_ERROR_TYPE_DEBUG,
926          "    checking handler for type %u\n",
927          handler->type);
928     if (handler->type == type)
929     {
930       if (GNUNET_OK !=
931           handler->callback (h->cls, t, &t->ctx, payload))
932       {
933         LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
934         GNUNET_MESH_tunnel_destroy (t);
935         return;
936       }
937       else
938       {
939         LOG (GNUNET_ERROR_TYPE_DEBUG,
940              "callback completed successfully\n");
941       }
942     }
943   }
944 }
945
946
947 /**
948  * Process a local ACK message, enabling the client to send
949  * more data to the service.
950  * 
951  * @param h Mesh handle.
952  * @param message Message itself.
953  */
954 static void
955 process_ack (struct GNUNET_MESH_Handle *h,
956              const struct GNUNET_MessageHeader *message)
957 {
958   struct GNUNET_MESH_LocalAck *msg;
959   struct GNUNET_MESH_Tunnel *t;
960   MESH_TunnelNumber tid;
961
962   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
963   h->acks_recv++;
964   msg = (struct GNUNET_MESH_LocalAck *) message;
965   tid = ntohl (msg->tunnel_id);
966   t = retrieve_tunnel (h, tid);
967   if (NULL == t)
968   {
969     LOG (GNUNET_ERROR_TYPE_WARNING, "ACK on unknown tunnel %X\n", tid);
970     return;
971   }
972   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on tunnel %X!\n", t->tid);
973   t->allow_send = GNUNET_YES;
974   if (NULL == h->th && 0 < t->packet_size)
975   {
976     LOG (GNUNET_ERROR_TYPE_DEBUG, "  tmt rdy was NULL, requesting!\n");
977     h->th =
978         GNUNET_CLIENT_notify_transmit_ready (h->client, t->packet_size,
979                                              GNUNET_TIME_UNIT_FOREVER_REL,
980                                              GNUNET_YES, &send_callback, h);
981   }
982 }
983
984
985 /**
986  * Process a local reply about info on all tunnels, pass info to the user.
987  *
988  * @param h Mesh handle.
989  * @param message Message itself.
990  */
991 static void
992 process_get_tunnels (struct GNUNET_MESH_Handle *h,
993                      const struct GNUNET_MessageHeader *message)
994 {
995   struct GNUNET_MESH_LocalMonitor *msg;
996
997   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Tunnels messasge received\n");
998
999   if (NULL == h->tunnels_cb)
1000   {
1001     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1002     return;
1003   }
1004
1005   msg = (struct GNUNET_MESH_LocalMonitor *) message;
1006   if (ntohs (message->size) !=
1007       (sizeof (struct GNUNET_MESH_LocalMonitor) +
1008        sizeof (struct GNUNET_PeerIdentity)))
1009   {
1010     GNUNET_break_op (0);
1011     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1012                 "Get tunnels message: size %hu - expected %u\n",
1013                 ntohs (message->size),
1014                 sizeof (struct GNUNET_MESH_LocalMonitor));
1015     return;
1016   }
1017   h->tunnels_cb (h->tunnels_cls,
1018                  ntohl (msg->tunnel_id),
1019                  &msg->owner,
1020                  &msg->destination);
1021 }
1022
1023
1024
1025 /**
1026  * Process a local monitor_tunnel reply, pass info to the user.
1027  *
1028  * @param h Mesh handle.
1029  * @param message Message itself.
1030  */
1031 static void
1032 process_show_tunnel (struct GNUNET_MESH_Handle *h,
1033                      const struct GNUNET_MessageHeader *message)
1034 {
1035   struct GNUNET_MESH_LocalMonitor *msg;
1036   size_t esize;
1037
1038   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Tunnel messasge received\n");
1039
1040   if (NULL == h->tunnel_cb)
1041   {
1042     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1043     return;
1044   }
1045
1046   /* Verify message sanity */
1047   msg = (struct GNUNET_MESH_LocalMonitor *) message;
1048   esize = sizeof (struct GNUNET_MESH_LocalMonitor);
1049   if (ntohs (message->size) != esize)
1050   {
1051     GNUNET_break_op (0);
1052     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1053                 "Show tunnel message: size %hu - expected %u\n",
1054                 ntohs (message->size),
1055                 esize);
1056
1057     h->tunnel_cb (h->tunnel_cls, NULL, NULL);
1058     h->tunnel_cb = NULL;
1059     h->tunnel_cls = NULL;
1060
1061     return;
1062   }
1063
1064   h->tunnel_cb (h->tunnel_cls,
1065                 &msg->destination,
1066                 &msg->owner);
1067 }
1068
1069
1070 /**
1071  * Function to process all messages received from the service
1072  *
1073  * @param cls closure
1074  * @param msg message received, NULL on timeout or fatal error
1075  */
1076 static void
1077 msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
1078 {
1079   struct GNUNET_MESH_Handle *h = cls;
1080   uint16_t type;
1081
1082   if (msg == NULL)
1083   {
1084     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1085          "Mesh service disconnected, reconnecting\n", h);
1086     reconnect (h);
1087     return;
1088   }
1089   type = ntohs (msg->type);
1090   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1091   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
1092        GNUNET_MESH_DEBUG_M2S (type));
1093   switch (type)
1094   {
1095     /* Notify of a new incoming tunnel */
1096   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE:
1097     process_tunnel_created (h, (struct GNUNET_MESH_TunnelMessage *) msg);
1098     break;
1099     /* Notify of a tunnel disconnection */
1100   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY:
1101     process_tunnel_destroy (h, (struct GNUNET_MESH_TunnelMessage *) msg);
1102     break;
1103     /* Notify of a new data packet in the tunnel */
1104   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA:
1105     process_incoming_data (h, msg);
1106     break;
1107   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
1108     process_ack (h, msg);
1109     break;
1110   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS:
1111     process_get_tunnels (h, msg);
1112     break;
1113   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL:
1114     process_show_tunnel (h, msg);
1115     break;
1116   default:
1117     /* We shouldn't get any other packages, log and ignore */
1118     LOG (GNUNET_ERROR_TYPE_WARNING,
1119          "unsolicited message form service (type %s)\n",
1120          GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
1121   }
1122   LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
1123   if (GNUNET_YES == h->in_receive)
1124   {
1125     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1126                            GNUNET_TIME_UNIT_FOREVER_REL);
1127   }
1128   else
1129   {
1130     LOG (GNUNET_ERROR_TYPE_DEBUG,
1131          "in receive off, not calling CLIENT_receive\n");
1132   }
1133 }
1134
1135
1136 /******************************************************************************/
1137 /************************       SEND FUNCTIONS     ****************************/
1138 /******************************************************************************/
1139
1140 /**
1141  * Function called to send a message to the service.
1142  * "buf" will be NULL and "size" zero if the socket was closed for writing in
1143  * the meantime.
1144  *
1145  * @param cls closure, the mesh handle
1146  * @param size number of bytes available in buf
1147  * @param buf where the callee should write the connect message
1148  * @return number of bytes written to buf
1149  */
1150 static size_t
1151 send_callback (void *cls, size_t size, void *buf)
1152 {
1153   struct GNUNET_MESH_Handle *h = cls;
1154   struct GNUNET_MESH_TransmitHandle *th;
1155   struct GNUNET_MESH_TransmitHandle *next;
1156   struct GNUNET_MESH_Tunnel *t;
1157   char *cbuf = buf;
1158   size_t tsize;
1159   size_t psize;
1160   size_t nsize;
1161
1162   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1163   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() Buffer %u\n", size);
1164   if ((0 == size) || (NULL == buf))
1165   {
1166     LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
1167     reconnect (h);
1168     h->th = NULL;
1169     return 0;
1170   }
1171   tsize = 0;
1172   next = h->th_head;
1173   nsize = message_ready_size (h);
1174   while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
1175   {
1176     t = th->tunnel;
1177     if (GNUNET_YES == th_is_payload (th))
1178     {
1179       struct GNUNET_MESH_LocalData *dmsg;
1180       struct GNUNET_MessageHeader *mh;
1181
1182       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload\n");
1183       if (GNUNET_NO == t->allow_send)
1184       {
1185         /* This tunnel is not ready to transmit yet, try next message */
1186         next = th->next;
1187         continue;
1188       }
1189       t->packet_size = 0;
1190       GNUNET_assert (size >= th->size);
1191       dmsg = (struct GNUNET_MESH_LocalData *) cbuf;
1192       mh = (struct GNUNET_MessageHeader *) &dmsg[1];
1193       psize = th->notify (th->notify_cls,
1194                           size - sizeof (struct GNUNET_MESH_LocalData),
1195                           mh);
1196       if (psize > 0)
1197       {
1198         psize += sizeof (struct GNUNET_MESH_LocalData);
1199         GNUNET_assert (size >= psize);
1200         dmsg->header.size = htons (psize);
1201         dmsg->tid = htonl (t->tid);
1202         dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA);
1203         LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload type %s\n",
1204              GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1205         t->allow_send = GNUNET_NO;
1206       }
1207       else
1208       {
1209         LOG (GNUNET_ERROR_TYPE_DEBUG,
1210              "#  callback returned size 0, "
1211              "application canceled transmission\n");
1212       }
1213     }
1214     else
1215     {
1216       struct GNUNET_MessageHeader *mh = (struct GNUNET_MessageHeader *) &th[1];
1217
1218       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  mesh internal traffic, type %s\n",
1219            GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1220       memcpy (cbuf, &th[1], th->size);
1221       psize = th->size;
1222     }
1223     if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1224       GNUNET_SCHEDULER_cancel (th->timeout_task);
1225     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1226     GNUNET_free (th);
1227     next = h->th_head;
1228     nsize = message_ready_size (h);
1229     cbuf += psize;
1230     size -= psize;
1231     tsize += psize;
1232   }
1233   LOG (GNUNET_ERROR_TYPE_DEBUG, "#  total size: %u\n", tsize);
1234   h->th = NULL;
1235   size = message_ready_size (h);
1236   if (0 != size)
1237   {
1238     LOG (GNUNET_ERROR_TYPE_DEBUG, "#  next size: %u\n", size);
1239     h->th =
1240         GNUNET_CLIENT_notify_transmit_ready (h->client, size,
1241                                              GNUNET_TIME_UNIT_FOREVER_REL,
1242                                              GNUNET_YES, &send_callback, h);
1243   }
1244   else
1245   {
1246     if (NULL != h->th_head)
1247       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  can't transmit any more\n");
1248     else
1249       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  nothing left to transmit\n");
1250   }
1251   if (GNUNET_NO == h->in_receive)
1252   {
1253     LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n");
1254     h->in_receive = GNUNET_YES;
1255     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1256                            GNUNET_TIME_UNIT_FOREVER_REL);
1257   }
1258   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() END\n");
1259   return tsize;
1260 }
1261
1262
1263 /**
1264  * Auxiliary function to send an already constructed packet to the service.
1265  * Takes care of creating a new queue element, copying the message and
1266  * calling the tmt_rdy function if necessary.
1267  * 
1268  * @param h mesh handle
1269  * @param msg message to transmit
1270  * @param tunnel tunnel this send is related to (NULL if N/A)
1271  */
1272 static void
1273 send_packet (struct GNUNET_MESH_Handle *h,
1274              const struct GNUNET_MessageHeader *msg,
1275              struct GNUNET_MESH_Tunnel *tunnel)
1276 {
1277   struct GNUNET_MESH_TransmitHandle *th;
1278   size_t msize;
1279
1280   LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
1281        GNUNET_MESH_DEBUG_M2S(ntohs(msg->type)));
1282   msize = ntohs (msg->size);
1283   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle) + msize);
1284   th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1285   th->size = msize;
1286   th->tunnel = tunnel;
1287   memcpy (&th[1], msg, msize);
1288   add_to_queue (h, th);
1289   LOG (GNUNET_ERROR_TYPE_DEBUG, "  queued\n");
1290   if (NULL != h->th)
1291     return;
1292   LOG (GNUNET_ERROR_TYPE_DEBUG, "  calling ntfy tmt rdy for %u bytes\n", msize);
1293   h->th =
1294       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
1295                                            GNUNET_TIME_UNIT_FOREVER_REL,
1296                                            GNUNET_YES, &send_callback, h);
1297 }
1298
1299
1300 /******************************************************************************/
1301 /**********************      API CALL DEFINITIONS     *************************/
1302 /******************************************************************************/
1303
1304 struct GNUNET_MESH_Handle *
1305 GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
1306                      GNUNET_MESH_InboundTunnelNotificationHandler new_tunnel,
1307                      GNUNET_MESH_TunnelEndHandler cleaner,
1308                      const struct GNUNET_MESH_MessageHandler *handlers,
1309                      const uint32_t *ports)
1310 {
1311   struct GNUNET_MESH_Handle *h;
1312
1313   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect()\n");
1314   h = GNUNET_malloc (sizeof (struct GNUNET_MESH_Handle));
1315   LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
1316   h->cfg = cfg;
1317   h->new_tunnel = new_tunnel;
1318   h->cleaner = cleaner;
1319   h->client = GNUNET_CLIENT_connect ("mesh", cfg);
1320   if (h->client == NULL)
1321   {
1322     GNUNET_break (0);
1323     GNUNET_free (h);
1324     return NULL;
1325   }
1326   h->cls = cls;
1327   h->message_handlers = handlers;
1328   h->ports = ports;
1329   h->next_tid = GNUNET_MESH_LOCAL_TUNNEL_ID_CLI;
1330   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1331   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1332
1333   if (NULL != ports && ports[0] != 0 && NULL == new_tunnel)
1334   {
1335     GNUNET_break (0);
1336     LOG (GNUNET_ERROR_TYPE_DEBUG,
1337          "no new tunnel handler given, ports parameter is useless!!\n");
1338   }
1339   if ((NULL == ports || ports[0] == 0) && NULL != new_tunnel)
1340   {
1341     GNUNET_break (0);
1342     LOG (GNUNET_ERROR_TYPE_DEBUG,
1343          "no ports given, new tunnel handler will never be called!!\n");
1344   }
1345   /* count handlers */
1346   for (h->n_handlers = 0;
1347        handlers && handlers[h->n_handlers].type;
1348        h->n_handlers++) ;
1349   for (h->n_ports = 0;
1350        ports && ports[h->n_ports];
1351        h->n_ports++) ;
1352   send_connect (h);
1353   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect() END\n");
1354   return h;
1355 }
1356
1357
1358 void
1359 GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle)
1360 {
1361   struct GNUNET_MESH_Tunnel *t;
1362   struct GNUNET_MESH_Tunnel *aux;
1363   struct GNUNET_MESH_TransmitHandle *th;
1364
1365   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH DISCONNECT\n");
1366
1367 #if DEBUG_ACK
1368   LOG (GNUNET_ERROR_TYPE_INFO, "Sent %d ACKs\n", handle->acks_sent);
1369   LOG (GNUNET_ERROR_TYPE_INFO, "Recv %d ACKs\n\n", handle->acks_recv);
1370 #endif
1371
1372   t = handle->tunnels_head;
1373   while (NULL != t)
1374   {
1375     aux = t->next;
1376     if (t->tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1377     {
1378       GNUNET_break (0);
1379       LOG (GNUNET_ERROR_TYPE_DEBUG, "tunnel %X not destroyed\n", t->tid);
1380     }
1381     destroy_tunnel (t, GNUNET_YES);
1382     t = aux;
1383   }
1384   while ( (th = handle->th_head) != NULL)
1385   {
1386     struct GNUNET_MessageHeader *msg;
1387
1388     /* Make sure it is an allowed packet (everything else should have been
1389      * already canceled).
1390      */
1391     GNUNET_break (GNUNET_NO == th_is_payload (th));
1392     msg = (struct GNUNET_MessageHeader *) &th[1];
1393     switch (ntohs(msg->type))
1394     {
1395       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT:
1396       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY:
1397       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS:
1398       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL:
1399         break;
1400       default:
1401         GNUNET_break (0);
1402         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected msg %u\n",
1403              ntohs(msg->type));
1404     }
1405
1406     GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th);
1407     GNUNET_free (th);
1408   }
1409
1410   if (NULL != handle->th)
1411   {
1412     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1413     handle->th = NULL;
1414   }
1415   if (NULL != handle->client)
1416   {
1417     GNUNET_CLIENT_disconnect (handle->client);
1418     handle->client = NULL;
1419   }
1420   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1421   {
1422     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1423     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1424   }
1425   GNUNET_free (handle);
1426 }
1427
1428
1429 /**
1430  * Create a new tunnel (we're initiator and will be allowed to add/remove peers
1431  * and to broadcast).
1432  *
1433  * @param h mesh handle
1434  * @param tunnel_ctx client's tunnel context to associate with the tunnel
1435  * @param peer peer identity the tunnel should go to
1436  * @param port Port number.
1437  * @param nobuffer Flag for disabling buffering on relay nodes.
1438  * @param reliable Flag for end-to-end reliability.
1439  *
1440  * @return handle to the tunnel
1441  */
1442 struct GNUNET_MESH_Tunnel *
1443 GNUNET_MESH_tunnel_create (struct GNUNET_MESH_Handle *h, 
1444                            void *tunnel_ctx,
1445                            const struct GNUNET_PeerIdentity *peer,
1446                            uint32_t port,
1447                            int nobuffer,
1448                            int reliable)
1449 {
1450   struct GNUNET_MESH_Tunnel *t;
1451   struct GNUNET_MESH_TunnelMessage msg;
1452
1453   LOG (GNUNET_ERROR_TYPE_DEBUG,
1454        "Creating new tunnel to %s:%u\n",
1455        GNUNET_i2s (peer), port);
1456   t = create_tunnel (h, 0);
1457   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", t);
1458   LOG (GNUNET_ERROR_TYPE_DEBUG, "  number %X\n", t->tid);
1459   t->ctx = tunnel_ctx;
1460   t->peer = GNUNET_PEER_intern (peer);
1461   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
1462   msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
1463   msg.tunnel_id = htonl (t->tid);
1464   msg.port = htonl (port);
1465   msg.peer = *peer;
1466   msg.opt = 0;
1467   if (GNUNET_YES == reliable)
1468     msg.opt |= GNUNET_MESH_OPTION_RELIABLE;
1469   if (GNUNET_YES == nobuffer)
1470     msg.opt |= GNUNET_MESH_OPTION_NOBUFFER;
1471   msg.opt = htonl (msg.opt);
1472   t->allow_send = 0;
1473   send_packet (h, &msg.header, t);
1474   return t;
1475 }
1476
1477
1478 void
1479 GNUNET_MESH_tunnel_destroy (struct GNUNET_MESH_Tunnel *tunnel)
1480 {
1481   struct GNUNET_MESH_Handle *h;
1482   struct GNUNET_MESH_TunnelMessage msg;
1483   struct GNUNET_MESH_TransmitHandle *th;
1484
1485   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying tunnel\n");
1486   h = tunnel->mesh;
1487
1488   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY);
1489   msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
1490   msg.tunnel_id = htonl (tunnel->tid);
1491   memset (&msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
1492   msg.port = 0;
1493   msg.opt = 0;
1494   th = h->th_head;
1495   while (th != NULL)
1496   {
1497     struct GNUNET_MESH_TransmitHandle *aux;
1498     if (th->tunnel == tunnel)
1499     {
1500       aux = th->next;
1501       /* FIXME call the handler? */
1502       if (GNUNET_YES == th_is_payload (th))
1503         th->notify (th->notify_cls, 0, NULL);
1504       GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1505       GNUNET_free (th);
1506       th = aux;
1507     }
1508     else
1509       th = th->next;
1510   }
1511
1512   destroy_tunnel (tunnel, GNUNET_YES);
1513   send_packet (h, &msg.header, NULL);
1514 }
1515
1516
1517 /**
1518  * Get information about a tunnel.
1519  *
1520  * @param tunnel Tunnel handle.
1521  * @param option Query (GNUNET_MESH_OPTION_*).
1522  * @param ... dependant on option, currently not used
1523  *
1524  * @return Union with an answer to the query.
1525  */
1526 const union GNUNET_MESH_TunnelInfo *
1527 GNUNET_MESH_tunnel_get_info (struct GNUNET_MESH_Tunnel *tunnel,
1528                              enum MeshTunnelOption option, ...)
1529 {
1530   const union GNUNET_MESH_TunnelInfo *ret;
1531
1532   switch (option)
1533   {
1534     case GNUNET_MESH_OPTION_NOBUFFER:
1535       ret = (const union GNUNET_MESH_TunnelInfo *) &tunnel->nobuffer;
1536       break;
1537     case GNUNET_MESH_OPTION_RELIABLE:
1538       ret = (const union GNUNET_MESH_TunnelInfo *) &tunnel->reliable;
1539       break;
1540     case GNUNET_MESH_OPTION_OOORDER:
1541       ret = (const union GNUNET_MESH_TunnelInfo *) &tunnel->ooorder;
1542       break;
1543     case GNUNET_MESH_OPTION_PEER:
1544       ret = (const union GNUNET_MESH_TunnelInfo *) &tunnel->peer;
1545       break;
1546     default:
1547       GNUNET_break (0);
1548       return NULL;
1549   }
1550
1551   return ret;
1552 }
1553
1554 struct GNUNET_MESH_TransmitHandle *
1555 GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork,
1556                                    struct GNUNET_TIME_Relative maxdelay,
1557                                    size_t notify_size,
1558                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1559                                    void *notify_cls)
1560 {
1561   struct GNUNET_MESH_TransmitHandle *th;
1562
1563   GNUNET_assert (NULL != tunnel);
1564   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY\n");
1565   LOG (GNUNET_ERROR_TYPE_DEBUG, "    on tunnel %X\n", tunnel->tid);
1566   LOG (GNUNET_ERROR_TYPE_DEBUG, "    allow_send %d\n", tunnel->allow_send);
1567   if (tunnel->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
1568     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to origin\n");
1569   else
1570     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to destination\n");
1571   LOG (GNUNET_ERROR_TYPE_DEBUG, "    payload size %u\n", notify_size);
1572   GNUNET_assert (NULL != notify);
1573   GNUNET_assert (0 == tunnel->packet_size); // Only one data packet allowed
1574   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle));
1575   th->tunnel = tunnel;
1576   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1577   th->size = notify_size + sizeof (struct GNUNET_MESH_LocalData);
1578   tunnel->packet_size = th->size;
1579   LOG (GNUNET_ERROR_TYPE_DEBUG, "    total size %u\n", th->size);
1580   th->notify = notify;
1581   th->notify_cls = notify_cls;
1582   add_to_queue (tunnel->mesh, th);
1583   if (NULL != tunnel->mesh->th)
1584     return th;
1585   if (GNUNET_NO == tunnel->allow_send)
1586     return th;
1587   LOG (GNUNET_ERROR_TYPE_DEBUG, "    call client notify tmt rdy\n");
1588   tunnel->mesh->th =
1589       GNUNET_CLIENT_notify_transmit_ready (tunnel->mesh->client, th->size,
1590                                            GNUNET_TIME_UNIT_FOREVER_REL,
1591                                            GNUNET_YES, &send_callback,
1592                                            tunnel->mesh);
1593   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY END\n");
1594   return th;
1595 }
1596
1597
1598 void
1599 GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th)
1600 {
1601   struct GNUNET_MESH_Handle *mesh;
1602
1603   th->tunnel->packet_size = 0;
1604   mesh = th->tunnel->mesh;
1605   if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1606     GNUNET_SCHEDULER_cancel (th->timeout_task);
1607   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
1608   GNUNET_free (th);
1609   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
1610   {
1611     /* queue empty, no point in asking for transmission */
1612     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
1613     mesh->th = NULL;
1614   }
1615 }
1616
1617 void
1618 GNUNET_MESH_receive_done (struct GNUNET_MESH_Tunnel *tunnel)
1619 {
1620   send_ack (tunnel);
1621 }
1622
1623
1624 /**
1625  * Request information about the running mesh peer.
1626  * The callback will be called for every tunnel known to the service,
1627  * listing all active peers that blong to the tunnel.
1628  *
1629  * If called again on the same handle, it will overwrite the previous
1630  * callback and cls. To retrieve the cls, monitor_cancel must be
1631  * called first.
1632  *
1633  * WARNING: unstable API, likely to change in the future!
1634  *
1635  * @param h Handle to the mesh peer.
1636  * @param callback Function to call with the requested data.
1637  * @param callback_cls Closure for @c callback.
1638  */
1639 void
1640 GNUNET_MESH_get_tunnels (struct GNUNET_MESH_Handle *h,
1641                          GNUNET_MESH_TunnelsCB callback,
1642                          void *callback_cls)
1643 {
1644   struct GNUNET_MessageHeader msg;
1645
1646   msg.size = htons (sizeof (msg));
1647   msg.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS);
1648   send_packet (h, &msg, NULL);
1649   h->tunnels_cb = callback;
1650   h->tunnels_cls = callback_cls;
1651
1652   return;
1653 }
1654
1655
1656 /**
1657  * Cancel a monitor request. The monitor callback will not be called.
1658  *
1659  * @param h Mesh handle.
1660  *
1661  * @return Closure given to GNUNET_MESH_monitor, if any.
1662  */
1663 void *
1664 GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h)
1665 {
1666   void *cls;
1667
1668   cls = h->tunnels_cls;
1669   h->tunnels_cb = NULL;
1670   h->tunnels_cls = NULL;
1671   return cls;
1672 }
1673
1674
1675 /**
1676  * Request information about a specific tunnel of the running mesh peer.
1677  *
1678  * WARNING: unstable API, likely to change in the future!
1679  * FIXME Add destination option.
1680  *
1681  * @param h Handle to the mesh peer.
1682  * @param initiator ID of the owner of the tunnel.
1683  * @param tunnel_number Tunnel number.
1684  * @param callback Function to call with the requested data.
1685  * @param callback_cls Closure for @c callback.
1686  */
1687 void
1688 GNUNET_MESH_show_tunnel (struct GNUNET_MESH_Handle *h,
1689                          struct GNUNET_PeerIdentity *initiator,
1690                          unsigned int tunnel_number,
1691                          GNUNET_MESH_TunnelCB callback,
1692                          void *callback_cls)
1693 {
1694   struct GNUNET_MESH_LocalMonitor msg;
1695
1696   msg.header.size = htons (sizeof (msg));
1697   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNEL);
1698   msg.owner = *initiator;
1699   msg.tunnel_id = htonl (tunnel_number);
1700   msg.reserved = 0;
1701   send_packet (h, &msg.header, NULL);
1702   h->tunnel_cb = callback;
1703   h->tunnel_cls = callback_cls;
1704
1705   return;
1706 }
1707
1708
1709 /**
1710  * Function called to notify a client about the connection
1711  * begin ready to queue more data.  "buf" will be
1712  * NULL and "size" zero if the connection was closed for
1713  * writing in the meantime.
1714  *
1715  * @param cls closure
1716  * @param size number of bytes available in buf
1717  * @param buf where the callee should write the message
1718  * @return number of bytes written to buf
1719  */
1720 static size_t
1721 mesh_mq_ntr (void *cls, size_t size,
1722              void *buf)
1723 {
1724   struct GNUNET_MQ_Handle *mq = cls; 
1725   struct MeshMQState *state = GNUNET_MQ_impl_state (mq);
1726   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
1727   uint16_t msize;
1728
1729   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "mesh-mq",
1730                    "writing message (t: %u, s: %u) to buffer\n",
1731                    ntohs (msg->type), ntohs (msg->size));
1732
1733   state->th = NULL;
1734   if (NULL == buf)
1735   {
1736     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
1737     return 0;
1738   }
1739   msize = ntohs (msg->size);
1740   GNUNET_assert (msize <= size);
1741   memcpy (buf, msg, msize);
1742   GNUNET_MQ_impl_send_continue (mq);
1743   return msize;
1744 }
1745
1746
1747 /**
1748  * Signature of functions implementing the
1749  * sending functionality of a message queue.
1750  *
1751  * @param mq the message queue
1752  * @param msg the message to send
1753  * @param impl_state state of the implementation
1754  */
1755 static void
1756 mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq,
1757                    const struct GNUNET_MessageHeader *msg, void *impl_state)
1758 {
1759   struct MeshMQState *state = impl_state;
1760
1761   GNUNET_assert (NULL == state->th);
1762   GNUNET_MQ_impl_send_commit (mq);
1763   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "mesh-mq",
1764                    "calling ntr for message (t: %u, s: %u)\n",
1765                    ntohs (msg->type), ntohs (msg->size));
1766   state->th =
1767       GNUNET_MESH_notify_transmit_ready (state->tunnel,
1768                                          /* FIXME: add option for corking */
1769                                          GNUNET_NO,
1770                                          GNUNET_TIME_UNIT_FOREVER_REL, 
1771                                          ntohs (msg->size),
1772                                          mesh_mq_ntr, mq);
1773
1774 }
1775
1776
1777 /**
1778  * Signature of functions implementing the
1779  * destruction of a message queue.
1780  * Implementations must not free 'mq', but should
1781  * take care of 'impl_state'.
1782  * 
1783  * @param mq the message queue to destroy
1784  * @param impl_state state of the implementation
1785  */
1786 static void
1787 mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
1788 {
1789   struct MeshMQState *state = impl_state;
1790
1791   if (NULL != state->th)
1792     GNUNET_MESH_notify_transmit_ready_cancel (state->th);
1793
1794   GNUNET_free (state);
1795 }
1796
1797
1798 /**
1799  * Create a message queue for a mesh tunnel.
1800  * The message queue can only be used to transmit messages,
1801  * not to receive them.
1802  *
1803  * @param tunnel the tunnel to create the message qeue for
1804  * @return a message queue to messages over the tunnel
1805  */
1806 struct GNUNET_MQ_Handle *
1807 GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel)
1808 {
1809   struct GNUNET_MQ_Handle *mq;
1810   struct MeshMQState *state;
1811
1812   state = GNUNET_new (struct MeshMQState);
1813   state->tunnel = tunnel;
1814
1815   mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl,
1816                                       mesh_mq_destroy_impl,
1817                                       NULL, /* FIXME: cancel impl. */
1818                                       state,
1819                                       NULL, /* no msg handlers */
1820                                       NULL, /* no err handlers */
1821                                       NULL); /* no handler cls */
1822   return mq;
1823 }
1824