-only notify AFTER sending is really close to finished, not before
[oweals/gnunet.git] / src / mesh / mesh_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4      GNUnet is free software; you can redistribute it and/or modify
5      it under the terms of the GNU General Public License as published
6      by the Free Software Foundation; either version 3, or (at your
7      option) any later version.
8      GNUnet is distributed in the hope that it will be useful, but
9      WITHOUT ANY WARRANTY; without even the implied warranty of
10      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11      General Public License for more details.
12      You should have received a copy of the GNU General Public License
13      along with GNUnet; see the file COPYING.  If not, write to the
14      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
15      Boston, MA 02111-1307, USA.
16 */
17
18 /**
19  * @file mesh/mesh_api.c
20  * @brief mesh api: client implementation of new mesh service
21  * @author Bartlomiej Polot
22  */
23
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_mesh_service.h"
27 #include "mesh.h"
28 #include "mesh_protocol.h"
29
30 #define LOG(kind,...) GNUNET_log_from (kind, "mesh-api",__VA_ARGS__)
31
32 /******************************************************************************/
33 /************************      DATA STRUCTURES     ****************************/
34 /******************************************************************************/
35
36 /**
37  * Transmission queue to the service
38  */
39 struct GNUNET_MESH_TransmitHandle
40 {
41
42     /**
43      * Double Linked list
44      */
45   struct GNUNET_MESH_TransmitHandle *next;
46
47     /**
48      * Double Linked list
49      */
50   struct GNUNET_MESH_TransmitHandle *prev;
51
52     /**
53      * Channel this message is sent on / for (may be NULL for control messages).
54      */
55   struct GNUNET_MESH_Channel *channel;
56
57     /**
58      * Callback to obtain the message to transmit, or NULL if we
59      * got the message in 'data'.  Notice that messages built
60      * by 'notify' need to be encapsulated with information about
61      * the 'target'.
62      */
63   GNUNET_CONNECTION_TransmitReadyNotify notify;
64
65     /**
66      * Closure for 'notify'
67      */
68   void *notify_cls;
69
70     /**
71      * How long is this message valid.  Once the timeout has been
72      * reached, the message must no longer be sent.  If this
73      * is a message with a 'notify' callback set, the 'notify'
74      * function should be called with 'buf' NULL and size 0.
75      */
76   struct GNUNET_TIME_Absolute timeout;
77
78     /**
79      * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
80      */
81   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
82
83     /**
84      * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
85      */
86   size_t size;
87 };
88
89
90 /**
91  * Opaque handle to the service.
92  */
93 struct GNUNET_MESH_Handle
94 {
95
96     /**
97      * Handle to the server connection, to send messages later
98      */
99   struct GNUNET_CLIENT_Connection *client;
100
101     /**
102      * Set of handlers used for processing incoming messages in the channels
103      */
104   const struct GNUNET_MESH_MessageHandler *message_handlers;
105
106   /**
107    * Number of handlers in the handlers array.
108    */
109   unsigned int n_handlers;
110
111   /**
112    * Ports open.
113    */
114   const uint32_t *ports;
115
116   /**
117    * Number of ports.
118    */
119   unsigned int n_ports;
120
121     /**
122      * Double linked list of the channels this client is connected to, head.
123      */
124   struct GNUNET_MESH_Channel *channels_head;
125
126     /**
127      * Double linked list of the channels this client is connected to, tail.
128      */
129   struct GNUNET_MESH_Channel *channels_tail;
130
131     /**
132      * Callback for inbound channel creation
133      */
134   GNUNET_MESH_InboundChannelNotificationHandler *new_channel;
135
136     /**
137      * Callback for inbound channel disconnection
138      */
139   GNUNET_MESH_ChannelEndHandler *cleaner;
140
141     /**
142      * Handle to cancel pending transmissions in case of disconnection
143      */
144   struct GNUNET_CLIENT_TransmitHandle *th;
145
146     /**
147      * Closure for all the handlers given by the client
148      */
149   void *cls;
150
151     /**
152      * Messages to send to the service, head.
153      */
154   struct GNUNET_MESH_TransmitHandle *th_head;
155
156     /**
157      * Messages to send to the service, tail.
158      */
159   struct GNUNET_MESH_TransmitHandle *th_tail;
160
161     /**
162      * chid of the next channel to create (to avoid reusing IDs often)
163      */
164   MESH_ChannelNumber next_chid;
165
166     /**
167      * Have we started the task to receive messages from the service
168      * yet? We do this after we send the 'MESH_LOCAL_CONNECT' message.
169      */
170   int in_receive;
171
172   /**
173    * Configuration given by the client, in case of reconnection
174    */
175   const struct GNUNET_CONFIGURATION_Handle *cfg;
176
177   /**
178    * Time to the next reconnect in case one reconnect fails
179    */
180   struct GNUNET_TIME_Relative reconnect_time;
181
182   /**
183    * Task for trying to reconnect.
184    */
185   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
186
187   /**
188    * Monitor callback
189    */
190   GNUNET_MESH_ChannelsCB channels_cb;
191
192   /**
193    * Monitor callback closure.
194    */
195   void *channels_cls;
196
197   /**
198    * Channel callback.
199    */
200   GNUNET_MESH_ChannelCB channel_cb;
201
202   /**
203    * Channel callback closure.
204    */
205   void *channel_cls;
206 };
207
208
209 /**
210  * Description of a peer
211  */
212 struct GNUNET_MESH_Peer
213 {
214     /**
215      * ID of the peer in short form
216      */
217   GNUNET_PEER_Id id;
218
219   /**
220    * Channel this peer belongs to
221    */
222   struct GNUNET_MESH_Channel *t;
223
224   /**
225    * Flag indicating whether service has informed about its connection
226    * FIXME-BART: is this flag used? Seems dead right now...
227    */
228   int connected;
229
230 };
231
232
233 /**
234  * Opaque handle to a channel.
235  */
236 struct GNUNET_MESH_Channel
237 {
238
239     /**
240      * DLL next
241      */
242   struct GNUNET_MESH_Channel *next;
243
244     /**
245      * DLL prev
246      */
247   struct GNUNET_MESH_Channel *prev;
248
249     /**
250      * Handle to the mesh this channel belongs to
251      */
252   struct GNUNET_MESH_Handle *mesh;
253
254     /**
255      * Local ID of the channel
256      */
257   MESH_ChannelNumber chid;
258
259     /**
260      * Port number.
261      */
262   uint32_t port;
263
264     /**
265      * Other end of the channel.
266      */
267   GNUNET_PEER_Id peer;
268
269   /**
270    * Any data the caller wants to put in here
271    */
272   void *ctx;
273
274     /**
275      * Size of packet queued in this channel
276      */
277   unsigned int packet_size;
278
279     /**
280      * Is the channel allowed to buffer?
281      */
282   int nobuffer;
283
284     /**
285      * Is the channel realiable?
286      */
287   int reliable;
288
289     /**
290      * If reliable, is the channel out of order?
291      */
292   int ooorder;
293
294     /**
295      * Are we allowed to send to the service?
296      */
297   int allow_send;
298
299 };
300
301
302 /**
303  * Implementation state for mesh's message queue.
304  */
305 struct MeshMQState
306 {
307   /**
308    * The current transmit handle, or NULL
309    * if no transmit is active.
310    */
311   struct GNUNET_MESH_TransmitHandle *th;
312
313   /**
314    * Channel to send the data over.
315    */
316   struct GNUNET_MESH_Channel *channel;
317 };
318
319
320 /******************************************************************************/
321 /***********************         DECLARATIONS         *************************/
322 /******************************************************************************/
323
324 /**
325  * Function called to send a message to the service.
326  * "buf" will be NULL and "size" zero if the socket was closed for writing in
327  * the meantime.
328  *
329  * @param cls closure, the mesh handle
330  * @param size number of bytes available in buf
331  * @param buf where the callee should write the connect message
332  * @return number of bytes written to buf
333  */
334 static size_t
335 send_callback (void *cls, size_t size, void *buf);
336
337
338 /******************************************************************************/
339 /***********************     AUXILIARY FUNCTIONS      *************************/
340 /******************************************************************************/
341
342 /**
343  * Check if transmission is a payload packet.
344  *
345  * @param th Transmission handle.
346  *
347  * @return GNUNET_YES if it is a payload packet,
348  *         GNUNET_NO if it is a mesh management packet.
349  */
350 static int
351 th_is_payload (struct GNUNET_MESH_TransmitHandle *th)
352 {
353   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
354 }
355
356
357 /**
358  * Check whether there is any message ready in the queue and find the size.
359  *
360  * @param h Mesh handle.
361  *
362  * @return The size of the first ready message in the queue,
363  *         0 if there is none.
364  */
365 static size_t
366 message_ready_size (struct GNUNET_MESH_Handle *h)
367 {
368   struct GNUNET_MESH_TransmitHandle *th;
369   struct GNUNET_MESH_Channel *ch;
370
371   for (th = h->th_head; NULL != th; th = th->next)
372   {
373     ch = th->channel;
374     if (GNUNET_NO == th_is_payload (th))
375     {
376       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message internal\n");
377       return th->size;
378     }
379     if (GNUNET_YES == ch->allow_send)
380     {
381       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message payload ok\n");
382       return th->size;
383     }
384   }
385   return 0;
386 }
387
388
389 /**
390  * Get the channel handler for the channel specified by id from the given handle
391  * @param h Mesh handle
392  * @param chid ID of the wanted channel
393  * @return handle to the required channel or NULL if not found
394  */
395 static struct GNUNET_MESH_Channel *
396 retrieve_channel (struct GNUNET_MESH_Handle *h, MESH_ChannelNumber chid)
397 {
398   struct GNUNET_MESH_Channel *ch;
399
400   ch = h->channels_head;
401   while (ch != NULL)
402   {
403     if (ch->chid == chid)
404       return ch;
405     ch = ch->next;
406   }
407   return NULL;
408 }
409
410
411 /**
412  * Create a new channel and insert it in the channel list of the mesh handle
413  *
414  * @param h Mesh handle
415  * @param chid Desired chid of the channel, 0 to assign one automatically.
416  *
417  * @return Handle to the created channel.
418  */
419 static struct GNUNET_MESH_Channel *
420 create_channel (struct GNUNET_MESH_Handle *h, MESH_ChannelNumber chid)
421 {
422   struct GNUNET_MESH_Channel *ch;
423
424   ch = GNUNET_malloc (sizeof (struct GNUNET_MESH_Channel));
425   GNUNET_CONTAINER_DLL_insert (h->channels_head, h->channels_tail, ch);
426   ch->mesh = h;
427   if (0 == chid)
428   {
429     ch->chid = h->next_chid;
430     while (NULL != retrieve_channel (h, h->next_chid))
431     {
432       h->next_chid++;
433       h->next_chid &= ~GNUNET_MESH_LOCAL_CHANNEL_ID_SERV;
434       h->next_chid |= GNUNET_MESH_LOCAL_CHANNEL_ID_CLI;
435     }
436   }
437   else
438   {
439     ch->chid = chid;
440   }
441   ch->allow_send = GNUNET_NO;
442   ch->nobuffer = GNUNET_NO;
443   return ch;
444 }
445
446
447 /**
448  * Destroy the specified channel.
449  * - Destroys all peers, calling the disconnect callback on each if needed
450  * - Cancels all outgoing traffic for that channel, calling respective notifys
451  * - Calls cleaner if channel was inbound
452  * - Frees all memory used
453  *
454  * @param ch Pointer to the channel.
455  * @param call_cleaner Whether to call the cleaner handler.
456  *
457  * @return Handle to the required channel or NULL if not found.
458  */
459 static void
460 destroy_channel (struct GNUNET_MESH_Channel *ch, int call_cleaner)
461 {
462   struct GNUNET_MESH_Handle *h;
463   struct GNUNET_MESH_TransmitHandle *th;
464   struct GNUNET_MESH_TransmitHandle *next;
465
466   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroy_channel %X\n", ch->chid);
467
468   if (NULL == ch)
469   {
470     GNUNET_break (0);
471     return;
472   }
473   h = ch->mesh;
474
475   GNUNET_CONTAINER_DLL_remove (h->channels_head, h->channels_tail, ch);
476
477   /* signal channel destruction */
478   if ( (NULL != h->cleaner) && (0 != ch->peer) && (GNUNET_YES == call_cleaner) )
479     h->cleaner (h->cls, ch, ch->ctx);
480
481   /* check that clients did not leave messages behind in the queue */
482   for (th = h->th_head; NULL != th; th = next)
483   {
484     next = th->next;
485     if (th->channel != ch)
486       continue;
487     /* Clients should have aborted their requests already.
488      * Management traffic should be ok, as clients can't cancel that */
489     GNUNET_break (GNUNET_NO == th_is_payload (th));
490     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
491
492     /* clean up request */
493     if (GNUNET_SCHEDULER_NO_TASK != th->timeout_task)
494       GNUNET_SCHEDULER_cancel (th->timeout_task);
495     GNUNET_free (th);
496   }
497
498   /* if there are no more pending requests with mesh service, cancel active request */
499   /* Note: this should be unnecessary... */
500   if ((0 == message_ready_size (h)) && (NULL != h->th))
501   {
502     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
503     h->th = NULL;
504   }
505
506   if (0 != ch->peer)
507     GNUNET_PEER_change_rc (ch->peer, -1);
508   GNUNET_free (ch);
509   return;
510 }
511
512
513 /**
514  * Notify client that the transmission has timed out
515  *
516  * @param cls closure
517  * @param tc task context
518  */
519 static void
520 timeout_transmission (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
521 {
522   struct GNUNET_MESH_TransmitHandle *th = cls;
523   struct GNUNET_MESH_Handle *mesh;
524
525   mesh = th->channel->mesh;
526   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
527   th->channel->packet_size = 0;
528   if (GNUNET_YES == th_is_payload (th))
529     th->notify (th->notify_cls, 0, NULL);
530   GNUNET_free (th);
531   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
532   {
533     /* nothing ready to transmit, no point in asking for transmission */
534     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
535     mesh->th = NULL;
536   }
537 }
538
539
540 /**
541  * Add a transmit handle to the transmission queue and set the
542  * timeout if needed.
543  *
544  * @param h mesh handle with the queue head and tail
545  * @param th handle to the packet to be transmitted
546  */
547 static void
548 add_to_queue (struct GNUNET_MESH_Handle *h,
549               struct GNUNET_MESH_TransmitHandle *th)
550 {
551   GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
552   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us == th->timeout.abs_value_us)
553     return;
554   th->timeout_task =
555       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
556                                     (th->timeout), &timeout_transmission, th);
557 }
558
559
560 /**
561  * Auxiliary function to send an already constructed packet to the service.
562  * Takes care of creating a new queue element, copying the message and
563  * calling the tmt_rdy function if necessary.
564  *
565  * @param h mesh handle
566  * @param msg message to transmit
567  * @param channel channel this send is related to (NULL if N/A)
568  */
569 static void
570 send_packet (struct GNUNET_MESH_Handle *h,
571              const struct GNUNET_MessageHeader *msg,
572              struct GNUNET_MESH_Channel *channel);
573
574
575 /**
576  * Send an ack on the channel to confirm the processing of a message.
577  *
578  * @param ch Channel on which to send the ACK.
579  */
580 static void
581 send_ack (struct GNUNET_MESH_Channel *ch)
582 {
583   struct GNUNET_MESH_LocalAck msg;
584
585   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid);
586   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
587   msg.header.size = htons (sizeof (msg));
588   msg.channel_id = htonl (ch->chid);
589
590   send_packet (ch->mesh, &msg.header, ch);
591   return;
592 }
593
594
595
596 /**
597  * Reconnect callback: tries to reconnect again after a failer previous
598  * reconnecttion
599  * @param cls closure (mesh handle)
600  * @param tc task context
601  */
602 static void
603 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
604
605
606 /**
607  * Send a connect packet to the service with the applications and types
608  * requested by the user.
609  *
610  * @param h The mesh handle.
611  *
612  */
613 static void
614 send_connect (struct GNUNET_MESH_Handle *h)
615 {
616   size_t size;
617
618   size = sizeof (struct GNUNET_MESH_ClientConnect);
619   size += h->n_ports * sizeof (uint32_t);
620   {
621     char buf[size] GNUNET_ALIGN;
622     struct GNUNET_MESH_ClientConnect *msg;
623     uint32_t *ports;
624     uint16_t i;
625
626     /* build connection packet */
627     msg = (struct GNUNET_MESH_ClientConnect *) buf;
628     msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
629     msg->header.size = htons (size);
630     ports = (uint32_t *) &msg[1];
631     for (i = 0; i < h->n_ports; i++)
632     {
633       ports[i] = htonl (h->ports[i]);
634       LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n",
635            h->ports[i]);
636     }
637     LOG (GNUNET_ERROR_TYPE_DEBUG,
638          "Sending %lu bytes long message with %u ports\n",
639          ntohs (msg->header.size), h->n_ports);
640     send_packet (h, &msg->header, NULL);
641   }
642 }
643
644
645 /**
646  * Reconnect to the service, retransmit all infomation to try to restore the
647  * original state.
648  *
649  * @param h handle to the mesh
650  *
651  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
652  */
653 static int
654 do_reconnect (struct GNUNET_MESH_Handle *h)
655 {
656   struct GNUNET_MESH_Channel *ch;
657
658   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
659   LOG (GNUNET_ERROR_TYPE_DEBUG, "*******   RECONNECT   *******\n");
660   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
661   LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
662   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
663
664   /* disconnect */
665   if (NULL != h->th)
666   {
667     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
668     h->th = NULL;
669   }
670   if (NULL != h->client)
671   {
672     GNUNET_CLIENT_disconnect (h->client);
673   }
674
675   /* connect again */
676   h->client = GNUNET_CLIENT_connect ("mesh", h->cfg);
677   if (h->client == NULL)
678   {
679     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
680                                                       &reconnect_cbk, h);
681     h->reconnect_time =
682         GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
683                                   GNUNET_TIME_relative_multiply
684                                   (h->reconnect_time, 2));
685     LOG (GNUNET_ERROR_TYPE_DEBUG, "Next retry in %s\n",
686          GNUNET_STRINGS_relative_time_to_string (h->reconnect_time,
687                                                  GNUNET_NO));
688     GNUNET_break (0);
689     return GNUNET_NO;
690   }
691   else
692   {
693     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
694   }
695   send_connect (h);
696   /* Rebuild all channels */
697   for (ch = h->channels_head; NULL != ch; ch = ch->next)
698   {
699     struct GNUNET_MESH_ChannelMessage tmsg;
700     uint32_t options;
701
702     if (ch->chid >= GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
703     {
704       /* Channel was created by service (incoming channel) */
705       /* TODO: Notify service of missing channel, to request
706        * creator to recreate path (find a path to him via DHT?)
707        */
708       continue;
709     }
710     ch->allow_send = GNUNET_NO;
711     tmsg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE);
712     tmsg.header.size = htons (sizeof (struct GNUNET_MESH_ChannelMessage));
713     tmsg.channel_id = htonl (ch->chid);
714     tmsg.port = htonl (ch->port);
715     GNUNET_PEER_resolve (ch->peer, &tmsg.peer);
716
717     options = 0;
718     if (GNUNET_YES == ch->nobuffer)
719       options |= GNUNET_MESH_OPTION_NOBUFFER;
720
721     if (GNUNET_YES == ch->reliable)
722       options |= GNUNET_MESH_OPTION_RELIABLE;
723
724     tmsg.opt = htonl (options);
725     send_packet (h, &tmsg.header, ch);
726   }
727   return GNUNET_YES;
728 }
729
730 /**
731  * Reconnect callback: tries to reconnect again after a failer previous
732  * reconnecttion
733  * @param cls closure (mesh handle)
734  * @param tc task context
735  */
736 static void
737 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
738 {
739   struct GNUNET_MESH_Handle *h = cls;
740
741   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
742   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
743     return;
744   do_reconnect (h);
745 }
746
747
748 /**
749  * Reconnect to the service, retransmit all infomation to try to restore the
750  * original state.
751  *
752  * @param h handle to the mesh
753  *
754  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
755  */
756 static void
757 reconnect (struct GNUNET_MESH_Handle *h)
758 {
759   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requested RECONNECT\n");
760   h->in_receive = GNUNET_NO;
761   if (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task)
762     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
763                                                       &reconnect_cbk, h);
764 }
765
766
767 /******************************************************************************/
768 /***********************      RECEIVE HANDLERS     ****************************/
769 /******************************************************************************/
770
771 /**
772  * Process the new channel notification and add it to the channels in the handle
773  *
774  * @param h     The mesh handle
775  * @param msg   A message with the details of the new incoming channel
776  */
777 static void
778 process_channel_created (struct GNUNET_MESH_Handle *h,
779                         const struct GNUNET_MESH_ChannelMessage *msg)
780 {
781   struct GNUNET_MESH_Channel *ch;
782   MESH_ChannelNumber chid;
783   uint32_t port;
784
785   chid = ntohl (msg->channel_id);
786   port = ntohl (msg->port);
787   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming channel %X:%u\n", chid, port);
788   if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
789   {
790     GNUNET_break (0);
791     return;
792   }
793   if (NULL != h->new_channel)
794   {
795     ch = create_channel (h, chid);
796     ch->allow_send = GNUNET_NO;
797     ch->peer = GNUNET_PEER_intern (&msg->peer);
798     ch->mesh = h;
799     ch->chid = chid;
800     ch->port = port;
801     if (0 != (msg->opt & GNUNET_MESH_OPTION_NOBUFFER))
802       ch->nobuffer = GNUNET_YES;
803     else
804       ch->nobuffer = GNUNET_NO;
805
806     if (0 != (msg->opt & GNUNET_MESH_OPTION_RELIABLE))
807       ch->reliable = GNUNET_YES;
808     else
809       ch->reliable = GNUNET_NO;
810
811     if (GNUNET_YES == ch->reliable &&
812         0 != (msg->opt & GNUNET_MESH_OPTION_OOORDER))
813       ch->ooorder = GNUNET_YES;
814     else
815       ch->ooorder = GNUNET_NO;
816
817     LOG (GNUNET_ERROR_TYPE_DEBUG, "  created channel %p\n", ch);
818     ch->ctx = h->new_channel (h->cls, ch, &msg->peer, ch->port);
819     LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
820   }
821   else
822   {
823     struct GNUNET_MESH_ChannelMessage d_msg;
824
825     LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n");
826
827     d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY);
828     d_msg.header.size = htons (sizeof (struct GNUNET_MESH_ChannelMessage));
829     d_msg.channel_id = msg->channel_id;
830     memset (&d_msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
831     d_msg.port = 0;
832     d_msg.opt = 0;
833
834     send_packet (h, &d_msg.header, NULL);
835   }
836   return;
837 }
838
839
840 /**
841  * Process the channel destroy notification and free associated resources
842  *
843  * @param h     The mesh handle
844  * @param msg   A message with the details of the channel being destroyed
845  */
846 static void
847 process_channel_destroy (struct GNUNET_MESH_Handle *h,
848                          const struct GNUNET_MESH_ChannelMessage *msg)
849 {
850   struct GNUNET_MESH_Channel *ch;
851   MESH_ChannelNumber chid;
852
853   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel from service\n");
854   chid = ntohl (msg->channel_id);
855   ch = retrieve_channel (h, chid);
856
857   if (NULL == ch)
858   {
859     LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X unknown\n", chid);
860     return;
861   }
862   LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X destroyed\n", ch->chid);
863   destroy_channel (ch, GNUNET_YES);
864 }
865
866
867 /**
868  * Process the incoming data packets, call appropriate handlers.
869  *
870  * @param h         The mesh handle
871  * @param message   A message encapsulating the data
872  */
873 static void
874 process_incoming_data (struct GNUNET_MESH_Handle *h,
875                        const struct GNUNET_MessageHeader *message)
876 {
877   const struct GNUNET_MessageHeader *payload;
878   const struct GNUNET_MESH_MessageHandler *handler;
879   struct GNUNET_MESH_LocalData *dmsg;
880   struct GNUNET_MESH_Channel *ch;
881   unsigned int i;
882   uint16_t type;
883
884   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n");
885
886   dmsg = (struct GNUNET_MESH_LocalData *) message;
887
888   ch = retrieve_channel (h, ntohl (dmsg->id));
889   payload = (struct GNUNET_MessageHeader *) &dmsg[1];
890   LOG (GNUNET_ERROR_TYPE_DEBUG, "  %s data on channel %s [%X]\n",
891        ch->chid >= GNUNET_MESH_LOCAL_CHANNEL_ID_SERV ? "fwd" : "bck",
892        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (dmsg->id));
893   if (NULL == ch)
894   {
895     /* Channel was ignored/destroyed, probably service didn't get it yet */
896     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ignored!\n");
897     return;
898   }
899   type = ntohs (payload->type);
900   LOG (GNUNET_ERROR_TYPE_DEBUG, "  payload type %u\n", type);
901   for (i = 0; i < h->n_handlers; i++)
902   {
903     handler = &h->message_handlers[i];
904     LOG (GNUNET_ERROR_TYPE_DEBUG,
905          "    checking handler for type %u\n",
906          handler->type);
907     if (handler->type == type)
908     {
909       if (GNUNET_OK !=
910           handler->callback (h->cls, ch, &ch->ctx, payload))
911       {
912         LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
913         GNUNET_MESH_channel_destroy (ch);
914         return;
915       }
916       else
917       {
918         LOG (GNUNET_ERROR_TYPE_DEBUG,
919              "callback completed successfully\n");
920       }
921     }
922   }
923 }
924
925
926 /**
927  * Process a local ACK message, enabling the client to send
928  * more data to the service.
929  *
930  * @param h Mesh handle.
931  * @param message Message itself.
932  */
933 static void
934 process_ack (struct GNUNET_MESH_Handle *h,
935              const struct GNUNET_MessageHeader *message)
936 {
937   struct GNUNET_MESH_LocalAck *msg;
938   struct GNUNET_MESH_Channel *ch;
939   MESH_ChannelNumber chid;
940
941   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
942   msg = (struct GNUNET_MESH_LocalAck *) message;
943   chid = ntohl (msg->channel_id);
944     ch = retrieve_channel (h, chid);
945   if (NULL == ch)
946   {
947     LOG (GNUNET_ERROR_TYPE_WARNING, "ACK on unknown channel %X\n", chid);
948     return;
949   }
950   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on channel %X!\n", ch->chid);
951   ch->allow_send = GNUNET_YES;
952   if (NULL == h->th && 0 < ch->packet_size)
953   {
954     LOG (GNUNET_ERROR_TYPE_DEBUG, "  tmt rdy was NULL, requesting!\n");
955     h->th =
956         GNUNET_CLIENT_notify_transmit_ready (h->client, ch->packet_size,
957                                              GNUNET_TIME_UNIT_FOREVER_REL,
958                                              GNUNET_YES, &send_callback, h);
959   }
960 }
961
962
963 /*
964  * Process a local reply about info on all channels, pass info to the user.
965  *
966  * @param h Mesh handle.
967  * @param message Message itself.
968  */
969 // static void
970 // process_get_channels (struct GNUNET_MESH_Handle *h,
971 //                      const struct GNUNET_MessageHeader *message)
972 // {
973 //   struct GNUNET_MESH_LocalMonitor *msg;
974 //
975 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
976 //
977 //   if (NULL == h->channels_cb)
978 //   {
979 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
980 //     return;
981 //   }
982 //
983 //   msg = (struct GNUNET_MESH_LocalMonitor *) message;
984 //   if (ntohs (message->size) !=
985 //       (sizeof (struct GNUNET_MESH_LocalMonitor) +
986 //        sizeof (struct GNUNET_PeerIdentity)))
987 //   {
988 //     GNUNET_break_op (0);
989 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
990 //                 "Get channels message: size %hu - expected %u\n",
991 //                 ntohs (message->size),
992 //                 sizeof (struct GNUNET_MESH_LocalMonitor));
993 //     return;
994 //   }
995 //   h->channels_cb (h->channels_cls,
996 //                   ntohl (msg->channel_id),
997 //                   &msg->owner,
998 //                   &msg->destination);
999 // }
1000
1001
1002
1003 /*
1004  * Process a local monitor_channel reply, pass info to the user.
1005  *
1006  * @param h Mesh handle.
1007  * @param message Message itself.
1008  */
1009 // static void
1010 // process_show_channel (struct GNUNET_MESH_Handle *h,
1011 //                      const struct GNUNET_MessageHeader *message)
1012 // {
1013 //   struct GNUNET_MESH_LocalMonitor *msg;
1014 //   size_t esize;
1015 //
1016 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1017 //
1018 //   if (NULL == h->channel_cb)
1019 //   {
1020 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1021 //     return;
1022 //   }
1023 //
1024 //   /* Verify message sanity */
1025 //   msg = (struct GNUNET_MESH_LocalMonitor *) message;
1026 //   esize = sizeof (struct GNUNET_MESH_LocalMonitor);
1027 //   if (ntohs (message->size) != esize)
1028 //   {
1029 //     GNUNET_break_op (0);
1030 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1031 //                 "Show channel message: size %hu - expected %u\n",
1032 //                 ntohs (message->size),
1033 //                 esize);
1034 //
1035 //     h->channel_cb (h->channel_cls, NULL, NULL);
1036 //     h->channel_cb = NULL;
1037 //     h->channel_cls = NULL;
1038 //
1039 //     return;
1040 //   }
1041 //
1042 //   h->channel_cb (h->channel_cls,
1043 //                  &msg->destination,
1044 //                  &msg->owner);
1045 // }
1046
1047
1048 /**
1049  * Function to process all messages received from the service
1050  *
1051  * @param cls closure
1052  * @param msg message received, NULL on timeout or fatal error
1053  */
1054 static void
1055 msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
1056 {
1057   struct GNUNET_MESH_Handle *h = cls;
1058   uint16_t type;
1059
1060   if (msg == NULL)
1061   {
1062     LOG (GNUNET_ERROR_TYPE_DEBUG,
1063          "Mesh service disconnected, reconnecting\n", h);
1064     reconnect (h);
1065     return;
1066   }
1067   type = ntohs (msg->type);
1068   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1069   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
1070        GNUNET_MESH_DEBUG_M2S (type));
1071   switch (type)
1072   {
1073     /* Notify of a new incoming channel */
1074   case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
1075     process_channel_created (h, (struct GNUNET_MESH_ChannelMessage *) msg);
1076     break;
1077     /* Notify of a channel disconnection */
1078   case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY:
1079   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_NACK:
1080     process_channel_destroy (h, (struct GNUNET_MESH_ChannelMessage *) msg);
1081     break;
1082   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA:
1083     process_incoming_data (h, msg);
1084     break;
1085   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
1086     process_ack (h, msg);
1087     break;
1088 //   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNELS: DEPRECATED
1089 //     process_get_channels (h, msg);
1090 //     break;
1091 //   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNEL: DEPRECATED
1092 //     process_show_channel (h, msg);
1093 //     break;
1094   default:
1095     /* We shouldn't get any other packages, log and ignore */
1096     LOG (GNUNET_ERROR_TYPE_WARNING,
1097          "unsolicited message form service (type %s)\n",
1098          GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
1099   }
1100   LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
1101   if (GNUNET_YES == h->in_receive)
1102   {
1103     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1104                            GNUNET_TIME_UNIT_FOREVER_REL);
1105   }
1106   else
1107   {
1108     LOG (GNUNET_ERROR_TYPE_DEBUG,
1109          "in receive off, not calling CLIENT_receive\n");
1110   }
1111 }
1112
1113
1114 /******************************************************************************/
1115 /************************       SEND FUNCTIONS     ****************************/
1116 /******************************************************************************/
1117
1118 /**
1119  * Function called to send a message to the service.
1120  * "buf" will be NULL and "size" zero if the socket was closed for writing in
1121  * the meantime.
1122  *
1123  * @param cls closure, the mesh handle
1124  * @param size number of bytes available in buf
1125  * @param buf where the callee should write the connect message
1126  * @return number of bytes written to buf
1127  */
1128 static size_t
1129 send_callback (void *cls, size_t size, void *buf)
1130 {
1131   struct GNUNET_MESH_Handle *h = cls;
1132   struct GNUNET_MESH_TransmitHandle *th;
1133   struct GNUNET_MESH_TransmitHandle *next;
1134   struct GNUNET_MESH_Channel *ch;
1135   char *cbuf = buf;
1136   size_t tsize;
1137   size_t psize;
1138   size_t nsize;
1139
1140   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1141   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() Buffer %u\n", size);
1142   if ((0 == size) || (NULL == buf))
1143   {
1144     LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
1145     reconnect (h);
1146     h->th = NULL;
1147     return 0;
1148   }
1149   tsize = 0;
1150   next = h->th_head;
1151   nsize = message_ready_size (h);
1152   while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
1153   {
1154     ch = th->channel;
1155     if (GNUNET_YES == th_is_payload (th))
1156     {
1157       struct GNUNET_MESH_LocalData *dmsg;
1158       struct GNUNET_MessageHeader *mh;
1159
1160       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload\n");
1161       if (GNUNET_NO == ch->allow_send)
1162       {
1163         /* This channel is not ready to transmit yet, try next message */
1164         next = th->next;
1165         continue;
1166       }
1167       ch->packet_size = 0;
1168       GNUNET_assert (size >= th->size);
1169       dmsg = (struct GNUNET_MESH_LocalData *) cbuf;
1170       mh = (struct GNUNET_MessageHeader *) &dmsg[1];
1171       psize = th->notify (th->notify_cls,
1172                           size - sizeof (struct GNUNET_MESH_LocalData),
1173                           mh);
1174       if (psize > 0)
1175       {
1176         psize += sizeof (struct GNUNET_MESH_LocalData);
1177         GNUNET_assert (size >= psize);
1178         dmsg->header.size = htons (psize);
1179         dmsg->id = htonl (ch->chid);
1180         dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA);
1181         LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload type %s\n",
1182              GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1183                 ch->allow_send = GNUNET_NO;
1184       }
1185       else
1186       {
1187         LOG (GNUNET_ERROR_TYPE_DEBUG,
1188              "#  callback returned size 0, "
1189              "application canceled transmission\n");
1190       }
1191     }
1192     else
1193     {
1194       struct GNUNET_MessageHeader *mh = (struct GNUNET_MessageHeader *) &th[1];
1195
1196       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  mesh internal traffic, type %s\n",
1197            GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1198       memcpy (cbuf, &th[1], th->size);
1199       psize = th->size;
1200     }
1201     if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1202       GNUNET_SCHEDULER_cancel (th->timeout_task);
1203     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1204     GNUNET_free (th);
1205     next = h->th_head;
1206     nsize = message_ready_size (h);
1207     cbuf += psize;
1208     size -= psize;
1209     tsize += psize;
1210   }
1211   LOG (GNUNET_ERROR_TYPE_DEBUG, "#  total size: %u\n", tsize);
1212   h->th = NULL;
1213   size = message_ready_size (h);
1214   if (0 != size)
1215   {
1216     LOG (GNUNET_ERROR_TYPE_DEBUG, "#  next size: %u\n", size);
1217     h->th =
1218         GNUNET_CLIENT_notify_transmit_ready (h->client, size,
1219                                              GNUNET_TIME_UNIT_FOREVER_REL,
1220                                              GNUNET_YES, &send_callback, h);
1221   }
1222   else
1223   {
1224     if (NULL != h->th_head)
1225       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  can't transmit any more\n");
1226     else
1227       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  nothing left to transmit\n");
1228   }
1229   if (GNUNET_NO == h->in_receive)
1230   {
1231     LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n");
1232     h->in_receive = GNUNET_YES;
1233     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1234                            GNUNET_TIME_UNIT_FOREVER_REL);
1235   }
1236   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() END\n");
1237   return tsize;
1238 }
1239
1240
1241 /**
1242  * Auxiliary function to send an already constructed packet to the service.
1243  * Takes care of creating a new queue element, copying the message and
1244  * calling the tmt_rdy function if necessary.
1245  *
1246  * @param h mesh handle
1247  * @param msg message to transmit
1248  * @param channel channel this send is related to (NULL if N/A)
1249  */
1250 static void
1251 send_packet (struct GNUNET_MESH_Handle *h,
1252              const struct GNUNET_MessageHeader *msg,
1253              struct GNUNET_MESH_Channel *channel)
1254 {
1255   struct GNUNET_MESH_TransmitHandle *th;
1256   size_t msize;
1257
1258   LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
1259        GNUNET_MESH_DEBUG_M2S(ntohs(msg->type)));
1260   msize = ntohs (msg->size);
1261   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle) + msize);
1262   th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1263   th->size = msize;
1264   th->channel = channel;
1265   memcpy (&th[1], msg, msize);
1266   add_to_queue (h, th);
1267   LOG (GNUNET_ERROR_TYPE_DEBUG, "  queued\n");
1268   if (NULL != h->th)
1269     return;
1270   LOG (GNUNET_ERROR_TYPE_DEBUG, "  calling ntfy tmt rdy for %u bytes\n", msize);
1271   h->th =
1272       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
1273                                            GNUNET_TIME_UNIT_FOREVER_REL,
1274                                            GNUNET_YES, &send_callback, h);
1275 }
1276
1277
1278 /******************************************************************************/
1279 /**********************      API CALL DEFINITIONS     *************************/
1280 /******************************************************************************/
1281
1282 struct GNUNET_MESH_Handle *
1283 GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
1284                      GNUNET_MESH_InboundChannelNotificationHandler new_channel,
1285                      GNUNET_MESH_ChannelEndHandler cleaner,
1286                      const struct GNUNET_MESH_MessageHandler *handlers,
1287                      const uint32_t *ports)
1288 {
1289   struct GNUNET_MESH_Handle *h;
1290
1291   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect()\n");
1292   h = GNUNET_malloc (sizeof (struct GNUNET_MESH_Handle));
1293   LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
1294   h->cfg = cfg;
1295   h->new_channel = new_channel;
1296   h->cleaner = cleaner;
1297   h->client = GNUNET_CLIENT_connect ("mesh", cfg);
1298   if (h->client == NULL)
1299   {
1300     GNUNET_break (0);
1301     GNUNET_free (h);
1302     return NULL;
1303   }
1304   h->cls = cls;
1305   h->message_handlers = handlers;
1306   h->ports = ports;
1307   h->next_chid = GNUNET_MESH_LOCAL_CHANNEL_ID_CLI;
1308   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1309   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1310
1311   if (NULL != ports && ports[0] != 0 && NULL == new_channel)
1312   {
1313     GNUNET_break (0);
1314     LOG (GNUNET_ERROR_TYPE_DEBUG,
1315          "no new channel handler given, ports parameter is useless!!\n");
1316   }
1317   if ((NULL == ports || ports[0] == 0) && NULL != new_channel)
1318   {
1319     GNUNET_break (0);
1320     LOG (GNUNET_ERROR_TYPE_DEBUG,
1321          "no ports given, new channel handler will never be called!!\n");
1322   }
1323   /* count handlers */
1324   for (h->n_handlers = 0;
1325        handlers && handlers[h->n_handlers].type;
1326        h->n_handlers++) ;
1327   for (h->n_ports = 0;
1328        ports && ports[h->n_ports];
1329        h->n_ports++) ;
1330   send_connect (h);
1331   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect() END\n");
1332   return h;
1333 }
1334
1335
1336 void
1337 GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle)
1338 {
1339   struct GNUNET_MESH_Channel *ch;
1340   struct GNUNET_MESH_Channel *aux;
1341   struct GNUNET_MESH_TransmitHandle *th;
1342
1343   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH DISCONNECT\n");
1344
1345   ch = handle->channels_head;
1346   while (NULL != ch)
1347   {
1348     aux = ch->next;
1349     if (ch->chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
1350     {
1351       GNUNET_break (0);
1352       LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X not destroyed\n", ch->chid);
1353     }
1354     destroy_channel (ch, GNUNET_YES);
1355     ch = aux;
1356   }
1357   while ( (th = handle->th_head) != NULL)
1358   {
1359     struct GNUNET_MessageHeader *msg;
1360
1361     /* Make sure it is an allowed packet (everything else should have been
1362      * already canceled).
1363      */
1364     GNUNET_break (GNUNET_NO == th_is_payload (th));
1365     msg = (struct GNUNET_MessageHeader *) &th[1];
1366     switch (ntohs(msg->type))
1367     {
1368       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT:
1369       case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
1370       case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY:
1371       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNELS:
1372       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNEL:
1373         break;
1374       default:
1375         GNUNET_break (0);
1376         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected msg %u\n",
1377              ntohs(msg->type));
1378     }
1379
1380     GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th);
1381     GNUNET_free (th);
1382   }
1383
1384   if (NULL != handle->th)
1385   {
1386     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1387     handle->th = NULL;
1388   }
1389   if (NULL != handle->client)
1390   {
1391     GNUNET_CLIENT_disconnect (handle->client);
1392     handle->client = NULL;
1393   }
1394   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1395   {
1396     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1397     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1398   }
1399   GNUNET_free (handle);
1400 }
1401
1402
1403 /**
1404  * Create a new channel towards a remote peer.
1405  *
1406  * If the destination port is not open by any peer or the destination peer
1407  * does not accept the channel, #GNUNET_MESH_ChannelEndHandler will be called
1408  * for this channel.
1409  *
1410  * @param h mesh handle
1411  * @param channel_ctx client's channel context to associate with the channel
1412  * @param peer peer identity the channel should go to
1413  * @param port Port number.
1414  * @param nobuffer Flag for disabling buffering on relay nodes.
1415  * @param reliable Flag for end-to-end reliability.
1416  *
1417  * @return handle to the channel
1418  */
1419 struct GNUNET_MESH_Channel *
1420 GNUNET_MESH_channel_create (struct GNUNET_MESH_Handle *h,
1421                            void *channel_ctx,
1422                            const struct GNUNET_PeerIdentity *peer,
1423                            uint32_t port,
1424                            int nobuffer,
1425                            int reliable)
1426 {
1427   struct GNUNET_MESH_Channel *ch;
1428   struct GNUNET_MESH_ChannelMessage msg;
1429
1430   LOG (GNUNET_ERROR_TYPE_DEBUG,
1431        "Creating new channel to %s:%u\n",
1432        GNUNET_i2s (peer), port);
1433   ch = create_channel (h, 0);
1434   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", ch);
1435   LOG (GNUNET_ERROR_TYPE_DEBUG, "  number %X\n", ch->chid);
1436   ch->ctx = channel_ctx;
1437   ch->peer = GNUNET_PEER_intern (peer);
1438   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE);
1439   msg.header.size = htons (sizeof (struct GNUNET_MESH_ChannelMessage));
1440   msg.channel_id = htonl (ch->chid);
1441   msg.port = htonl (port);
1442   msg.peer = *peer;
1443   msg.opt = 0;
1444   if (GNUNET_YES == reliable)
1445     msg.opt |= GNUNET_MESH_OPTION_RELIABLE;
1446   if (GNUNET_YES == nobuffer)
1447     msg.opt |= GNUNET_MESH_OPTION_NOBUFFER;
1448   msg.opt = htonl (msg.opt);
1449   ch->allow_send = 0;
1450   send_packet (h, &msg.header, ch);
1451   return ch;
1452 }
1453
1454
1455 void
1456 GNUNET_MESH_channel_destroy (struct GNUNET_MESH_Channel *channel)
1457 {
1458   struct GNUNET_MESH_Handle *h;
1459   struct GNUNET_MESH_ChannelMessage msg;
1460   struct GNUNET_MESH_TransmitHandle *th;
1461
1462   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n");
1463   h = channel->mesh;
1464
1465   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY);
1466   msg.header.size = htons (sizeof (struct GNUNET_MESH_ChannelMessage));
1467   msg.channel_id = htonl (channel->chid);
1468   memset (&msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
1469   msg.port = 0;
1470   msg.opt = 0;
1471   th = h->th_head;
1472   while (th != NULL)
1473   {
1474     struct GNUNET_MESH_TransmitHandle *aux;
1475     if (th->channel == channel)
1476     {
1477       aux = th->next;
1478       /* FIXME call the handler? */
1479       if (GNUNET_YES == th_is_payload (th))
1480         th->notify (th->notify_cls, 0, NULL);
1481       GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1482       GNUNET_free (th);
1483       th = aux;
1484     }
1485     else
1486       th = th->next;
1487   }
1488
1489   destroy_channel (channel, GNUNET_YES);
1490   send_packet (h, &msg.header, NULL);
1491 }
1492
1493
1494 /**
1495  * Get information about a channel.
1496  *
1497  * @param channel Channel handle.
1498  * @param option Query (GNUNET_MESH_OPTION_*).
1499  * @param ... dependant on option, currently not used
1500  *
1501  * @return Union with an answer to the query.
1502  */
1503 const union GNUNET_MESH_ChannelInfo *
1504 GNUNET_MESH_channel_get_info (struct GNUNET_MESH_Channel *channel,
1505                              enum MeshOption option, ...)
1506 {
1507   const union GNUNET_MESH_ChannelInfo *ret;
1508
1509   switch (option)
1510   {
1511     case GNUNET_MESH_OPTION_NOBUFFER:
1512       ret = (const union GNUNET_MESH_ChannelInfo *) &channel->nobuffer;
1513       break;
1514     case GNUNET_MESH_OPTION_RELIABLE:
1515       ret = (const union GNUNET_MESH_ChannelInfo *) &channel->reliable;
1516       break;
1517     case GNUNET_MESH_OPTION_OOORDER:
1518       ret = (const union GNUNET_MESH_ChannelInfo *) &channel->ooorder;
1519       break;
1520     case GNUNET_MESH_OPTION_PEER:
1521       ret = (const union GNUNET_MESH_ChannelInfo *) &channel->peer;
1522       break;
1523     default:
1524       GNUNET_break (0);
1525       return NULL;
1526   }
1527
1528   return ret;
1529 }
1530
1531 struct GNUNET_MESH_TransmitHandle *
1532 GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Channel *channel, int cork,
1533                                    struct GNUNET_TIME_Relative maxdelay,
1534                                    size_t notify_size,
1535                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1536                                    void *notify_cls)
1537 {
1538   struct GNUNET_MESH_TransmitHandle *th;
1539
1540   GNUNET_assert (NULL != channel);
1541   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY\n");
1542   LOG (GNUNET_ERROR_TYPE_DEBUG, "    on channel %X\n", channel->chid);
1543   LOG (GNUNET_ERROR_TYPE_DEBUG, "    allow_send %d\n", channel->allow_send);
1544   if (channel->chid >= GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
1545     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to origin\n");
1546   else
1547     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to destination\n");
1548   LOG (GNUNET_ERROR_TYPE_DEBUG, "    payload size %u\n", notify_size);
1549   GNUNET_assert (NULL != notify);
1550   GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed
1551   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle));
1552   th->channel = channel;
1553   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1554   th->size = notify_size + sizeof (struct GNUNET_MESH_LocalData);
1555   channel->packet_size = th->size;
1556   LOG (GNUNET_ERROR_TYPE_DEBUG, "    total size %u\n", th->size);
1557   th->notify = notify;
1558   th->notify_cls = notify_cls;
1559   add_to_queue (channel->mesh, th);
1560   if (NULL != channel->mesh->th)
1561     return th;
1562   if (GNUNET_NO == channel->allow_send)
1563     return th;
1564   LOG (GNUNET_ERROR_TYPE_DEBUG, "    call client notify tmt rdy\n");
1565   channel->mesh->th =
1566       GNUNET_CLIENT_notify_transmit_ready (channel->mesh->client, th->size,
1567                                            GNUNET_TIME_UNIT_FOREVER_REL,
1568                                            GNUNET_YES, &send_callback,
1569                                            channel->mesh);
1570   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY END\n");
1571   return th;
1572 }
1573
1574
1575 void
1576 GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th)
1577 {
1578   struct GNUNET_MESH_Handle *mesh;
1579
1580   th->channel->packet_size = 0;
1581   mesh = th->channel->mesh;
1582   if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1583     GNUNET_SCHEDULER_cancel (th->timeout_task);
1584   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
1585   GNUNET_free (th);
1586   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
1587   {
1588     /* queue empty, no point in asking for transmission */
1589     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
1590     mesh->th = NULL;
1591   }
1592 }
1593
1594
1595 void
1596 GNUNET_MESH_receive_done (struct GNUNET_MESH_Channel *channel)
1597 {
1598   send_ack (channel);
1599 }
1600
1601
1602 /**
1603  * Request information about the running mesh peer.
1604  * The callback will be called for every channel known to the service,
1605  * listing all active peers that blong to the channel.
1606  *
1607  * If called again on the same handle, it will overwrite the previous
1608  * callback and cls. To retrieve the cls, monitor_cancel must be
1609  * called first.
1610  *
1611  * WARNING: unstable API, likely to change in the future!
1612  *
1613  * @param h Handle to the mesh peer.
1614  * @param callback Function to call with the requested data.
1615  * @param callback_cls Closure for @c callback.
1616  */
1617 void
1618 GNUNET_MESH_get_channels (struct GNUNET_MESH_Handle *h,
1619                          GNUNET_MESH_ChannelsCB callback,
1620                          void *callback_cls)
1621 {
1622   struct GNUNET_MessageHeader msg;
1623
1624   msg.size = htons (sizeof (msg));
1625   msg.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNELS);
1626   send_packet (h, &msg, NULL);
1627   h->channels_cb = callback;
1628   h->channels_cls = callback_cls;
1629 }
1630
1631
1632 /**
1633  * Cancel a monitor request. The monitor callback will not be called.
1634  *
1635  * @param h Mesh handle.
1636  *
1637  * @return Closure given to GNUNET_MESH_monitor, if any.
1638  */
1639 void *
1640 GNUNET_MESH_get_channels_cancel (struct GNUNET_MESH_Handle *h)
1641 {
1642   void *cls;
1643
1644   cls = h->channels_cls;
1645   h->channels_cb = NULL;
1646   h->channels_cls = NULL;
1647   return cls;
1648 }
1649
1650
1651 /**
1652  * Request information about a specific channel of the running mesh peer.
1653  *
1654  * WARNING: unstable API, likely to change in the future!
1655  * FIXME Add destination option.
1656  *
1657  * @param h Handle to the mesh peer.
1658  * @param initiator ID of the owner of the channel.
1659  * @param channel_number Channel number.
1660  * @param callback Function to call with the requested data.
1661  * @param callback_cls Closure for @c callback.
1662  */
1663 void
1664 GNUNET_MESH_show_channel (struct GNUNET_MESH_Handle *h,
1665                          struct GNUNET_PeerIdentity *initiator,
1666                          unsigned int channel_number,
1667                          GNUNET_MESH_ChannelCB callback,
1668                          void *callback_cls)
1669 {
1670   struct GNUNET_MESH_LocalMonitor msg;
1671
1672   msg.header.size = htons (sizeof (msg));
1673   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNEL);
1674   msg.owner = *initiator;
1675   msg.channel_id = htonl (channel_number);
1676   msg.reserved = 0;
1677   send_packet (h, &msg.header, NULL);
1678   h->channel_cb = callback;
1679   h->channel_cls = callback_cls;
1680 }
1681
1682
1683 /**
1684  * Function called to notify a client about the connection
1685  * begin ready to queue more data.  "buf" will be
1686  * NULL and "size" zero if the connection was closed for
1687  * writing in the meantime.
1688  *
1689  * @param cls closure
1690  * @param size number of bytes available in buf
1691  * @param buf where the callee should write the message
1692  * @return number of bytes written to buf
1693  */
1694 static size_t
1695 mesh_mq_ntr (void *cls, size_t size,
1696              void *buf)
1697 {
1698   struct GNUNET_MQ_Handle *mq = cls;
1699   struct MeshMQState *state = GNUNET_MQ_impl_state (mq);
1700   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
1701   uint16_t msize;
1702
1703   state->th = NULL;
1704   if (NULL == buf)
1705   {
1706     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
1707     return 0;
1708   }
1709   msize = ntohs (msg->size);
1710   GNUNET_assert (msize <= size);
1711   memcpy (buf, msg, msize);
1712   GNUNET_MQ_impl_send_continue (mq);
1713   GNUNET_MQ_impl_send_commit (mq);
1714   return msize;
1715 }
1716
1717
1718 /**
1719  * Signature of functions implementing the
1720  * sending functionality of a message queue.
1721  *
1722  * @param mq the message queue
1723  * @param msg the message to send
1724  * @param impl_state state of the implementation
1725  */
1726 static void
1727 mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq,
1728                    const struct GNUNET_MessageHeader *msg, void *impl_state)
1729 {
1730   struct MeshMQState *state = impl_state;
1731
1732   GNUNET_assert (NULL == state->th);
1733   state->th =
1734       GNUNET_MESH_notify_transmit_ready (state->channel,
1735                                          /* FIXME: add option for corking */
1736                                          GNUNET_NO,
1737                                          GNUNET_TIME_UNIT_FOREVER_REL,
1738                                          ntohs (msg->size),
1739                                          mesh_mq_ntr, mq);
1740
1741 }
1742
1743
1744 /**
1745  * Signature of functions implementing the
1746  * destruction of a message queue.
1747  * Implementations must not free 'mq', but should
1748  * take care of 'impl_state'.
1749  *
1750  * @param mq the message queue to destroy
1751  * @param impl_state state of the implementation
1752  */
1753 static void
1754 mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
1755 {
1756   struct MeshMQState *state = impl_state;
1757
1758   if (NULL != state->th)
1759     GNUNET_MESH_notify_transmit_ready_cancel (state->th);
1760
1761   GNUNET_free (state);
1762 }
1763
1764
1765 /**
1766  * Create a message queue for a mesh channel.
1767  * The message queue can only be used to transmit messages,
1768  * not to receive them.
1769  *
1770  * @param channel the channel to create the message qeue for
1771  * @return a message queue to messages over the channel
1772  */
1773 struct GNUNET_MQ_Handle *
1774 GNUNET_MESH_mq_create (struct GNUNET_MESH_Channel *channel)
1775 {
1776   struct GNUNET_MQ_Handle *mq;
1777   struct MeshMQState *state;
1778
1779   state = GNUNET_new (struct MeshMQState);
1780   state->channel = channel;
1781
1782   mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl,
1783                                       mesh_mq_destroy_impl,
1784                                       NULL, /* FIXME: cancel impl. */
1785                                       state,
1786                                       NULL, /* no msg handlers */
1787                                       NULL, /* no err handlers */
1788                                       NULL); /* no handler cls */
1789   return mq;
1790 }
1791