- allow for destruction of channel inside inbound handler
[oweals/gnunet.git] / src / mesh / mesh_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4      GNUnet is free software; you can redistribute it and/or modify
5      it under the terms of the GNU General Public License as published
6      by the Free Software Foundation; either version 3, or (at your
7      option) any later version.
8      GNUnet is distributed in the hope that it will be useful, but
9      WITHOUT ANY WARRANTY; without even the implied warranty of
10      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11      General Public License for more details.
12      You should have received a copy of the GNU General Public License
13      along with GNUnet; see the file COPYING.  If not, write to the
14      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
15      Boston, MA 02111-1307, USA.
16 */
17
18 /**
19  * @file mesh/mesh_api.c
20  * @brief mesh api: client implementation of new mesh service
21  * @author Bartlomiej Polot
22  */
23
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_mesh_service.h"
27 #include "mesh.h"
28 #include "mesh_protocol.h"
29
30 #define LOG(kind,...) GNUNET_log_from (kind, "mesh-api",__VA_ARGS__)
31
32 /******************************************************************************/
33 /************************      DATA STRUCTURES     ****************************/
34 /******************************************************************************/
35
36 /**
37  * Transmission queue to the service
38  */
39 struct GNUNET_MESH_TransmitHandle
40 {
41
42     /**
43      * Double Linked list
44      */
45   struct GNUNET_MESH_TransmitHandle *next;
46
47     /**
48      * Double Linked list
49      */
50   struct GNUNET_MESH_TransmitHandle *prev;
51
52     /**
53      * Channel this message is sent on / for (may be NULL for control messages).
54      */
55   struct GNUNET_MESH_Channel *channel;
56
57     /**
58      * Callback to obtain the message to transmit, or NULL if we
59      * got the message in 'data'.  Notice that messages built
60      * by 'notify' need to be encapsulated with information about
61      * the 'target'.
62      */
63   GNUNET_CONNECTION_TransmitReadyNotify notify;
64
65     /**
66      * Closure for 'notify'
67      */
68   void *notify_cls;
69
70     /**
71      * How long is this message valid.  Once the timeout has been
72      * reached, the message must no longer be sent.  If this
73      * is a message with a 'notify' callback set, the 'notify'
74      * function should be called with 'buf' NULL and size 0.
75      */
76   struct GNUNET_TIME_Absolute timeout;
77
78     /**
79      * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
80      */
81   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
82
83     /**
84      * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
85      */
86   size_t size;
87 };
88
89
90 /**
91  * Opaque handle to the service.
92  */
93 struct GNUNET_MESH_Handle
94 {
95
96     /**
97      * Handle to the server connection, to send messages later
98      */
99   struct GNUNET_CLIENT_Connection *client;
100
101     /**
102      * Set of handlers used for processing incoming messages in the channels
103      */
104   const struct GNUNET_MESH_MessageHandler *message_handlers;
105
106   /**
107    * Number of handlers in the handlers array.
108    */
109   unsigned int n_handlers;
110
111   /**
112    * Ports open.
113    */
114   const uint32_t *ports;
115
116   /**
117    * Number of ports.
118    */
119   unsigned int n_ports;
120
121     /**
122      * Double linked list of the channels this client is connected to, head.
123      */
124   struct GNUNET_MESH_Channel *channels_head;
125
126     /**
127      * Double linked list of the channels this client is connected to, tail.
128      */
129   struct GNUNET_MESH_Channel *channels_tail;
130
131     /**
132      * Callback for inbound channel creation
133      */
134   GNUNET_MESH_InboundChannelNotificationHandler *new_channel;
135
136     /**
137      * Callback for inbound channel disconnection
138      */
139   GNUNET_MESH_ChannelEndHandler *cleaner;
140
141     /**
142      * Handle to cancel pending transmissions in case of disconnection
143      */
144   struct GNUNET_CLIENT_TransmitHandle *th;
145
146     /**
147      * Closure for all the handlers given by the client
148      */
149   void *cls;
150
151     /**
152      * Messages to send to the service, head.
153      */
154   struct GNUNET_MESH_TransmitHandle *th_head;
155
156     /**
157      * Messages to send to the service, tail.
158      */
159   struct GNUNET_MESH_TransmitHandle *th_tail;
160
161     /**
162      * chid of the next channel to create (to avoid reusing IDs often)
163      */
164   MESH_ChannelNumber next_chid;
165
166     /**
167      * Have we started the task to receive messages from the service
168      * yet? We do this after we send the 'MESH_LOCAL_CONNECT' message.
169      */
170   int in_receive;
171
172   /**
173    * Configuration given by the client, in case of reconnection
174    */
175   const struct GNUNET_CONFIGURATION_Handle *cfg;
176
177   /**
178    * Time to the next reconnect in case one reconnect fails
179    */
180   struct GNUNET_TIME_Relative reconnect_time;
181
182   /**
183    * Task for trying to reconnect.
184    */
185   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
186
187   /**
188    * Monitor callback
189    */
190   GNUNET_MESH_ChannelsCB channels_cb;
191
192   /**
193    * Monitor callback closure.
194    */
195   void *channels_cls;
196
197   /**
198    * Channel callback.
199    */
200   GNUNET_MESH_ChannelCB channel_cb;
201
202   /**
203    * Channel callback closure.
204    */
205   void *channel_cls;
206 };
207
208
209 /**
210  * Description of a peer
211  */
212 struct GNUNET_MESH_Peer
213 {
214     /**
215      * ID of the peer in short form
216      */
217   GNUNET_PEER_Id id;
218
219   /**
220    * Channel this peer belongs to
221    */
222   struct GNUNET_MESH_Channel *t;
223
224   /**
225    * Flag indicating whether service has informed about its connection
226    * FIXME-BART: is this flag used? Seems dead right now...
227    */
228   int connected;
229
230 };
231
232
233 /**
234  * Opaque handle to a channel.
235  */
236 struct GNUNET_MESH_Channel
237 {
238
239     /**
240      * DLL next
241      */
242   struct GNUNET_MESH_Channel *next;
243
244     /**
245      * DLL prev
246      */
247   struct GNUNET_MESH_Channel *prev;
248
249     /**
250      * Handle to the mesh this channel belongs to
251      */
252   struct GNUNET_MESH_Handle *mesh;
253
254     /**
255      * Local ID of the channel
256      */
257   MESH_ChannelNumber chid;
258
259     /**
260      * Port number.
261      */
262   uint32_t port;
263
264     /**
265      * Other end of the channel.
266      */
267   GNUNET_PEER_Id peer;
268
269   /**
270    * Any data the caller wants to put in here
271    */
272   void *ctx;
273
274     /**
275      * Size of packet queued in this channel
276      */
277   unsigned int packet_size;
278
279     /**
280      * Is the channel allowed to buffer?
281      */
282   int nobuffer;
283
284     /**
285      * Is the channel realiable?
286      */
287   int reliable;
288
289     /**
290      * If reliable, is the channel out of order?
291      */
292   int ooorder;
293
294     /**
295      * Are we allowed to send to the service?
296      */
297   int allow_send;
298
299 };
300
301
302 /**
303  * Implementation state for mesh's message queue.
304  */
305 struct MeshMQState
306 {
307   /**
308    * The current transmit handle, or NULL
309    * if no transmit is active.
310    */
311   struct GNUNET_MESH_TransmitHandle *th;
312
313   /**
314    * Channel to send the data over.
315    */
316   struct GNUNET_MESH_Channel *channel;
317 };
318
319
320 /******************************************************************************/
321 /***********************         DECLARATIONS         *************************/
322 /******************************************************************************/
323
324 /**
325  * Function called to send a message to the service.
326  * "buf" will be NULL and "size" zero if the socket was closed for writing in
327  * the meantime.
328  *
329  * @param cls closure, the mesh handle
330  * @param size number of bytes available in buf
331  * @param buf where the callee should write the connect message
332  * @return number of bytes written to buf
333  */
334 static size_t
335 send_callback (void *cls, size_t size, void *buf);
336
337
338 /******************************************************************************/
339 /***********************     AUXILIARY FUNCTIONS      *************************/
340 /******************************************************************************/
341
342 /**
343  * Check if transmission is a payload packet.
344  *
345  * @param th Transmission handle.
346  *
347  * @return GNUNET_YES if it is a payload packet,
348  *         GNUNET_NO if it is a mesh management packet.
349  */
350 static int
351 th_is_payload (struct GNUNET_MESH_TransmitHandle *th)
352 {
353   return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO;
354 }
355
356
357 /**
358  * Check whether there is any message ready in the queue and find the size.
359  *
360  * @param h Mesh handle.
361  *
362  * @return The size of the first ready message in the queue,
363  *         0 if there is none.
364  */
365 static size_t
366 message_ready_size (struct GNUNET_MESH_Handle *h)
367 {
368   struct GNUNET_MESH_TransmitHandle *th;
369   struct GNUNET_MESH_Channel *ch;
370
371   for (th = h->th_head; NULL != th; th = th->next)
372   {
373     ch = th->channel;
374     if (GNUNET_NO == th_is_payload (th))
375     {
376       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message internal\n");
377       return th->size;
378     }
379     if (GNUNET_YES == ch->allow_send)
380     {
381       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  message payload ok\n");
382       return th->size;
383     }
384   }
385   return 0;
386 }
387
388
389 /**
390  * Get the channel handler for the channel specified by id from the given handle
391  * @param h Mesh handle
392  * @param chid ID of the wanted channel
393  * @return handle to the required channel or NULL if not found
394  */
395 static struct GNUNET_MESH_Channel *
396 retrieve_channel (struct GNUNET_MESH_Handle *h, MESH_ChannelNumber chid)
397 {
398   struct GNUNET_MESH_Channel *ch;
399
400   ch = h->channels_head;
401   while (ch != NULL)
402   {
403     if (ch->chid == chid)
404       return ch;
405     ch = ch->next;
406   }
407   return NULL;
408 }
409
410
411 /**
412  * Create a new channel and insert it in the channel list of the mesh handle
413  *
414  * @param h Mesh handle
415  * @param chid Desired chid of the channel, 0 to assign one automatically.
416  *
417  * @return Handle to the created channel.
418  */
419 static struct GNUNET_MESH_Channel *
420 create_channel (struct GNUNET_MESH_Handle *h, MESH_ChannelNumber chid)
421 {
422   struct GNUNET_MESH_Channel *ch;
423
424   ch = GNUNET_malloc (sizeof (struct GNUNET_MESH_Channel));
425   GNUNET_CONTAINER_DLL_insert (h->channels_head, h->channels_tail, ch);
426   ch->mesh = h;
427   if (0 == chid)
428   {
429     ch->chid = h->next_chid;
430     while (NULL != retrieve_channel (h, h->next_chid))
431     {
432       h->next_chid++;
433       h->next_chid &= ~GNUNET_MESH_LOCAL_CHANNEL_ID_SERV;
434       h->next_chid |= GNUNET_MESH_LOCAL_CHANNEL_ID_CLI;
435     }
436   }
437   else
438   {
439     ch->chid = chid;
440   }
441   ch->allow_send = GNUNET_NO;
442   ch->nobuffer = GNUNET_NO;
443   return ch;
444 }
445
446
447 /**
448  * Destroy the specified channel.
449  * - Destroys all peers, calling the disconnect callback on each if needed
450  * - Cancels all outgoing traffic for that channel, calling respective notifys
451  * - Calls cleaner if channel was inbound
452  * - Frees all memory used
453  *
454  * @param ch Pointer to the channel.
455  * @param call_cleaner Whether to call the cleaner handler.
456  *
457  * @return Handle to the required channel or NULL if not found.
458  */
459 static void
460 destroy_channel (struct GNUNET_MESH_Channel *ch, int call_cleaner)
461 {
462   struct GNUNET_MESH_Handle *h;
463   struct GNUNET_MESH_TransmitHandle *th;
464   struct GNUNET_MESH_TransmitHandle *next;
465
466   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroy_channel %X\n", ch->chid);
467
468   if (NULL == ch)
469   {
470     GNUNET_break (0);
471     return;
472   }
473   h = ch->mesh;
474
475   GNUNET_CONTAINER_DLL_remove (h->channels_head, h->channels_tail, ch);
476
477   /* signal channel destruction */
478   if ( (NULL != h->cleaner) && (0 != ch->peer) && (GNUNET_YES == call_cleaner) )
479     h->cleaner (h->cls, ch, ch->ctx);
480
481   /* check that clients did not leave messages behind in the queue */
482   for (th = h->th_head; NULL != th; th = next)
483   {
484     next = th->next;
485     if (th->channel != ch)
486       continue;
487     /* Clients should have aborted their requests already.
488      * Management traffic should be ok, as clients can't cancel that.
489      * If the service crashed and we are reconnecting, it's ok.
490      */
491     GNUNET_break (GNUNET_NO == th_is_payload (th)
492                   || GNUNET_NO == h->in_receive);
493     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
494
495     /* clean up request */
496     if (GNUNET_SCHEDULER_NO_TASK != th->timeout_task)
497       GNUNET_SCHEDULER_cancel (th->timeout_task);
498     GNUNET_free (th);
499   }
500
501   /* if there are no more pending requests with mesh service, cancel active request */
502   /* Note: this should be unnecessary... */
503   if ((0 == message_ready_size (h)) && (NULL != h->th))
504   {
505     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
506     h->th = NULL;
507   }
508
509   if (0 != ch->peer)
510     GNUNET_PEER_change_rc (ch->peer, -1);
511   GNUNET_free (ch);
512   return;
513 }
514
515
516 /**
517  * Notify client that the transmission has timed out
518  *
519  * @param cls closure
520  * @param tc task context
521  */
522 static void
523 timeout_transmission (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
524 {
525   struct GNUNET_MESH_TransmitHandle *th = cls;
526   struct GNUNET_MESH_Handle *mesh;
527
528   mesh = th->channel->mesh;
529   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
530   th->channel->packet_size = 0;
531   if (GNUNET_YES == th_is_payload (th))
532     th->notify (th->notify_cls, 0, NULL);
533   GNUNET_free (th);
534   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
535   {
536     /* nothing ready to transmit, no point in asking for transmission */
537     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
538     mesh->th = NULL;
539   }
540 }
541
542
543 /**
544  * Add a transmit handle to the transmission queue and set the
545  * timeout if needed.
546  *
547  * @param h mesh handle with the queue head and tail
548  * @param th handle to the packet to be transmitted
549  */
550 static void
551 add_to_queue (struct GNUNET_MESH_Handle *h,
552               struct GNUNET_MESH_TransmitHandle *th)
553 {
554   GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
555   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us == th->timeout.abs_value_us)
556     return;
557   th->timeout_task =
558       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
559                                     (th->timeout), &timeout_transmission, th);
560 }
561
562
563 /**
564  * Auxiliary function to send an already constructed packet to the service.
565  * Takes care of creating a new queue element, copying the message and
566  * calling the tmt_rdy function if necessary.
567  *
568  * @param h mesh handle
569  * @param msg message to transmit
570  * @param channel channel this send is related to (NULL if N/A)
571  */
572 static void
573 send_packet (struct GNUNET_MESH_Handle *h,
574              const struct GNUNET_MessageHeader *msg,
575              struct GNUNET_MESH_Channel *channel);
576
577
578 /**
579  * Send an ack on the channel to confirm the processing of a message.
580  *
581  * @param ch Channel on which to send the ACK.
582  */
583 static void
584 send_ack (struct GNUNET_MESH_Channel *ch)
585 {
586   struct GNUNET_MESH_LocalAck msg;
587
588   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid);
589   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
590   msg.header.size = htons (sizeof (msg));
591   msg.channel_id = htonl (ch->chid);
592
593   send_packet (ch->mesh, &msg.header, ch);
594   return;
595 }
596
597
598
599 /**
600  * Reconnect callback: tries to reconnect again after a failer previous
601  * reconnecttion
602  * @param cls closure (mesh handle)
603  * @param tc task context
604  */
605 static void
606 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
607
608
609 /**
610  * Send a connect packet to the service with the applications and types
611  * requested by the user.
612  *
613  * @param h The mesh handle.
614  *
615  */
616 static void
617 send_connect (struct GNUNET_MESH_Handle *h)
618 {
619   size_t size;
620
621   size = sizeof (struct GNUNET_MESH_ClientConnect);
622   size += h->n_ports * sizeof (uint32_t);
623   {
624     char buf[size] GNUNET_ALIGN;
625     struct GNUNET_MESH_ClientConnect *msg;
626     uint32_t *ports;
627     uint16_t i;
628
629     /* build connection packet */
630     msg = (struct GNUNET_MESH_ClientConnect *) buf;
631     msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT);
632     msg->header.size = htons (size);
633     ports = (uint32_t *) &msg[1];
634     for (i = 0; i < h->n_ports; i++)
635     {
636       ports[i] = htonl (h->ports[i]);
637       LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n",
638            h->ports[i]);
639     }
640     LOG (GNUNET_ERROR_TYPE_DEBUG,
641          "Sending %lu bytes long message with %u ports\n",
642          ntohs (msg->header.size), h->n_ports);
643     send_packet (h, &msg->header, NULL);
644   }
645 }
646
647
648 /**
649  * Reconnect to the service, retransmit all infomation to try to restore the
650  * original state.
651  *
652  * @param h handle to the mesh
653  *
654  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
655  */
656 static int
657 do_reconnect (struct GNUNET_MESH_Handle *h)
658 {
659   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
660   LOG (GNUNET_ERROR_TYPE_DEBUG, "*******   RECONNECT   *******\n");
661   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
662   LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
663   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
664
665   /* disconnect */
666   if (NULL != h->th)
667   {
668     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
669     h->th = NULL;
670   }
671   if (NULL != h->client)
672   {
673     GNUNET_CLIENT_disconnect (h->client);
674   }
675
676   /* connect again */
677   h->client = GNUNET_CLIENT_connect ("mesh", h->cfg);
678   if (h->client == NULL)
679   {
680     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
681                                                       &reconnect_cbk, h);
682     h->reconnect_time =
683         GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
684                                   GNUNET_TIME_relative_multiply
685                                   (h->reconnect_time, 2));
686     LOG (GNUNET_ERROR_TYPE_DEBUG, "Next retry in %s\n",
687          GNUNET_STRINGS_relative_time_to_string (h->reconnect_time,
688                                                  GNUNET_NO));
689     GNUNET_break (0);
690     return GNUNET_NO;
691   }
692   else
693   {
694     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
695   }
696   send_connect (h);
697   return GNUNET_YES;
698 }
699
700 /**
701  * Reconnect callback: tries to reconnect again after a failer previous
702  * reconnecttion
703  * @param cls closure (mesh handle)
704  * @param tc task context
705  */
706 static void
707 reconnect_cbk (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
708 {
709   struct GNUNET_MESH_Handle *h = cls;
710
711   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
712   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
713     return;
714   do_reconnect (h);
715 }
716
717
718 /**
719  * Reconnect to the service, retransmit all infomation to try to restore the
720  * original state.
721  *
722  * @param h handle to the mesh
723  *
724  * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
725  */
726 static void
727 reconnect (struct GNUNET_MESH_Handle *h)
728 {
729   struct GNUNET_MESH_Channel *ch;
730   struct GNUNET_MESH_Channel *next;
731
732   LOG (GNUNET_ERROR_TYPE_DEBUG,
733        "Requested RECONNECT, destroying all channels\n");
734   h->in_receive = GNUNET_NO;
735   for (ch = h->channels_head; NULL != ch; ch = next)
736   {
737     next = ch->next;
738     destroy_channel (ch, GNUNET_YES);
739   }
740   if (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task)
741     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
742                                                       &reconnect_cbk, h);
743 }
744
745
746 /******************************************************************************/
747 /***********************      RECEIVE HANDLERS     ****************************/
748 /******************************************************************************/
749
750 /**
751  * Process the new channel notification and add it to the channels in the handle
752  *
753  * @param h     The mesh handle
754  * @param msg   A message with the details of the new incoming channel
755  */
756 static void
757 process_channel_created (struct GNUNET_MESH_Handle *h,
758                         const struct GNUNET_MESH_ChannelMessage *msg)
759 {
760   struct GNUNET_MESH_Channel *ch;
761   MESH_ChannelNumber chid;
762   uint32_t port;
763
764   chid = ntohl (msg->channel_id);
765   port = ntohl (msg->port);
766   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming channel %X:%u\n", chid, port);
767   if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
768   {
769     GNUNET_break (0);
770     return;
771   }
772   if (NULL != h->new_channel)
773   {
774     void *ctx;
775
776     ch = create_channel (h, chid);
777     ch->allow_send = GNUNET_NO;
778     ch->peer = GNUNET_PEER_intern (&msg->peer);
779     ch->mesh = h;
780     ch->chid = chid;
781     ch->port = port;
782     if (0 != (msg->opt & GNUNET_MESH_OPTION_NOBUFFER))
783       ch->nobuffer = GNUNET_YES;
784     else
785       ch->nobuffer = GNUNET_NO;
786
787     if (0 != (msg->opt & GNUNET_MESH_OPTION_RELIABLE))
788       ch->reliable = GNUNET_YES;
789     else
790       ch->reliable = GNUNET_NO;
791
792     if (GNUNET_YES == ch->reliable &&
793         0 != (msg->opt & GNUNET_MESH_OPTION_OOORDER))
794       ch->ooorder = GNUNET_YES;
795     else
796       ch->ooorder = GNUNET_NO;
797
798     LOG (GNUNET_ERROR_TYPE_DEBUG, "  created channel %p\n", ch);
799     ctx = h->new_channel (h->cls, ch, &msg->peer, ch->port);
800     if (NULL != ctx)
801       ch->ctx = ctx;
802     LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
803   }
804   else
805   {
806     struct GNUNET_MESH_ChannelMessage d_msg;
807
808     LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n");
809
810     d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY);
811     d_msg.header.size = htons (sizeof (struct GNUNET_MESH_ChannelMessage));
812     d_msg.channel_id = msg->channel_id;
813     memset (&d_msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
814     d_msg.port = 0;
815     d_msg.opt = 0;
816
817     send_packet (h, &d_msg.header, NULL);
818   }
819   return;
820 }
821
822
823 /**
824  * Process the channel destroy notification and free associated resources
825  *
826  * @param h     The mesh handle
827  * @param msg   A message with the details of the channel being destroyed
828  */
829 static void
830 process_channel_destroy (struct GNUNET_MESH_Handle *h,
831                          const struct GNUNET_MESH_ChannelMessage *msg)
832 {
833   struct GNUNET_MESH_Channel *ch;
834   MESH_ChannelNumber chid;
835
836   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel from service\n");
837   chid = ntohl (msg->channel_id);
838   ch = retrieve_channel (h, chid);
839
840   if (NULL == ch)
841   {
842     LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X unknown\n", chid);
843     return;
844   }
845   LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X destroyed\n", ch->chid);
846   destroy_channel (ch, GNUNET_YES);
847 }
848
849
850 /**
851  * Process the incoming data packets, call appropriate handlers.
852  *
853  * @param h         The mesh handle
854  * @param message   A message encapsulating the data
855  */
856 static void
857 process_incoming_data (struct GNUNET_MESH_Handle *h,
858                        const struct GNUNET_MessageHeader *message)
859 {
860   const struct GNUNET_MessageHeader *payload;
861   const struct GNUNET_MESH_MessageHandler *handler;
862   struct GNUNET_MESH_LocalData *dmsg;
863   struct GNUNET_MESH_Channel *ch;
864   unsigned int i;
865   uint16_t type;
866
867   LOG (GNUNET_ERROR_TYPE_DEBUG,
868        "Got a data message!\n");
869   dmsg = (struct GNUNET_MESH_LocalData *) message;
870   ch = retrieve_channel (h, ntohl (dmsg->id));
871   payload = (struct GNUNET_MessageHeader *) &dmsg[1];
872   LOG (GNUNET_ERROR_TYPE_DEBUG,
873        "  %s data on channel %s [%X]\n",
874        ch->chid >= GNUNET_MESH_LOCAL_CHANNEL_ID_SERV ? "fwd" : "bck",
875        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (dmsg->id));
876   if (NULL == ch)
877   {
878     /* Channel was ignored/destroyed, probably service didn't get it yet */
879     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ignored!\n");
880     return;
881   }
882   type = ntohs (payload->type);
883   LOG (GNUNET_ERROR_TYPE_DEBUG, "  payload type %u\n", type);
884   for (i = 0; i < h->n_handlers; i++)
885   {
886     handler = &h->message_handlers[i];
887     LOG (GNUNET_ERROR_TYPE_DEBUG,
888          "    checking handler for type %u\n",
889          handler->type);
890     if (handler->type == type)
891     {
892       if (GNUNET_OK !=
893           handler->callback (h->cls, ch, &ch->ctx, payload))
894       {
895         LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
896         GNUNET_MESH_channel_destroy (ch);
897         return;
898       }
899       else
900       {
901         LOG (GNUNET_ERROR_TYPE_DEBUG,
902              "callback completed successfully\n");
903         return;
904       }
905     }
906   }
907 }
908
909
910 /**
911  * Process a local ACK message, enabling the client to send
912  * more data to the service.
913  *
914  * @param h Mesh handle.
915  * @param message Message itself.
916  */
917 static void
918 process_ack (struct GNUNET_MESH_Handle *h,
919              const struct GNUNET_MessageHeader *message)
920 {
921   struct GNUNET_MESH_LocalAck *msg;
922   struct GNUNET_MESH_Channel *ch;
923   MESH_ChannelNumber chid;
924
925   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
926   msg = (struct GNUNET_MESH_LocalAck *) message;
927   chid = ntohl (msg->channel_id);
928     ch = retrieve_channel (h, chid);
929   if (NULL == ch)
930   {
931     LOG (GNUNET_ERROR_TYPE_WARNING, "ACK on unknown channel %X\n", chid);
932     return;
933   }
934   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on channel %X!\n", ch->chid);
935   ch->allow_send = GNUNET_YES;
936   if (NULL == h->th && 0 < ch->packet_size)
937   {
938     LOG (GNUNET_ERROR_TYPE_DEBUG, "  tmt rdy was NULL, requesting!\n");
939     h->th =
940         GNUNET_CLIENT_notify_transmit_ready (h->client, ch->packet_size,
941                                              GNUNET_TIME_UNIT_FOREVER_REL,
942                                              GNUNET_YES, &send_callback, h);
943   }
944 }
945
946
947 /*
948  * Process a local reply about info on all channels, pass info to the user.
949  *
950  * @param h Mesh handle.
951  * @param message Message itself.
952  */
953 // static void
954 // process_get_channels (struct GNUNET_MESH_Handle *h,
955 //                      const struct GNUNET_MessageHeader *message)
956 // {
957 //   struct GNUNET_MESH_LocalMonitor *msg;
958 //
959 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n");
960 //
961 //   if (NULL == h->channels_cb)
962 //   {
963 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
964 //     return;
965 //   }
966 //
967 //   msg = (struct GNUNET_MESH_LocalMonitor *) message;
968 //   if (ntohs (message->size) !=
969 //       (sizeof (struct GNUNET_MESH_LocalMonitor) +
970 //        sizeof (struct GNUNET_PeerIdentity)))
971 //   {
972 //     GNUNET_break_op (0);
973 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
974 //                 "Get channels message: size %hu - expected %u\n",
975 //                 ntohs (message->size),
976 //                 sizeof (struct GNUNET_MESH_LocalMonitor));
977 //     return;
978 //   }
979 //   h->channels_cb (h->channels_cls,
980 //                   ntohl (msg->channel_id),
981 //                   &msg->owner,
982 //                   &msg->destination);
983 // }
984
985
986
987 /*
988  * Process a local monitor_channel reply, pass info to the user.
989  *
990  * @param h Mesh handle.
991  * @param message Message itself.
992  */
993 // static void
994 // process_show_channel (struct GNUNET_MESH_Handle *h,
995 //                      const struct GNUNET_MessageHeader *message)
996 // {
997 //   struct GNUNET_MESH_LocalMonitor *msg;
998 //   size_t esize;
999 //
1000 //   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n");
1001 //
1002 //   if (NULL == h->channel_cb)
1003 //   {
1004 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "  ignored\n");
1005 //     return;
1006 //   }
1007 //
1008 //   /* Verify message sanity */
1009 //   msg = (struct GNUNET_MESH_LocalMonitor *) message;
1010 //   esize = sizeof (struct GNUNET_MESH_LocalMonitor);
1011 //   if (ntohs (message->size) != esize)
1012 //   {
1013 //     GNUNET_break_op (0);
1014 //     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1015 //                 "Show channel message: size %hu - expected %u\n",
1016 //                 ntohs (message->size),
1017 //                 esize);
1018 //
1019 //     h->channel_cb (h->channel_cls, NULL, NULL);
1020 //     h->channel_cb = NULL;
1021 //     h->channel_cls = NULL;
1022 //
1023 //     return;
1024 //   }
1025 //
1026 //   h->channel_cb (h->channel_cls,
1027 //                  &msg->destination,
1028 //                  &msg->owner);
1029 // }
1030
1031
1032 /**
1033  * Function to process all messages received from the service
1034  *
1035  * @param cls closure
1036  * @param msg message received, NULL on timeout or fatal error
1037  */
1038 static void
1039 msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
1040 {
1041   struct GNUNET_MESH_Handle *h = cls;
1042   uint16_t type;
1043
1044   if (msg == NULL)
1045   {
1046     LOG (GNUNET_ERROR_TYPE_DEBUG,
1047          "Mesh service disconnected, reconnecting\n", h);
1048     reconnect (h);
1049     return;
1050   }
1051   type = ntohs (msg->type);
1052   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1053   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
1054        GNUNET_MESH_DEBUG_M2S (type));
1055   switch (type)
1056   {
1057     /* Notify of a new incoming channel */
1058   case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
1059     process_channel_created (h, (struct GNUNET_MESH_ChannelMessage *) msg);
1060     break;
1061     /* Notify of a channel disconnection */
1062   case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY:
1063   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_NACK:
1064     process_channel_destroy (h, (struct GNUNET_MESH_ChannelMessage *) msg);
1065     break;
1066   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA:
1067     process_incoming_data (h, msg);
1068     break;
1069   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
1070     process_ack (h, msg);
1071     break;
1072 //   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNELS: DEPRECATED
1073 //     process_get_channels (h, msg);
1074 //     break;
1075 //   case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNEL: DEPRECATED
1076 //     process_show_channel (h, msg);
1077 //     break;
1078   default:
1079     /* We shouldn't get any other packages, log and ignore */
1080     LOG (GNUNET_ERROR_TYPE_WARNING,
1081          "unsolicited message form service (type %s)\n",
1082          GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
1083   }
1084   LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
1085   if (GNUNET_YES == h->in_receive)
1086   {
1087     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1088                            GNUNET_TIME_UNIT_FOREVER_REL);
1089   }
1090   else
1091   {
1092     LOG (GNUNET_ERROR_TYPE_DEBUG,
1093          "in receive off, not calling CLIENT_receive\n");
1094   }
1095 }
1096
1097
1098 /******************************************************************************/
1099 /************************       SEND FUNCTIONS     ****************************/
1100 /******************************************************************************/
1101
1102 /**
1103  * Function called to send a message to the service.
1104  * "buf" will be NULL and "size" zero if the socket was closed for writing in
1105  * the meantime.
1106  *
1107  * @param cls closure, the mesh handle
1108  * @param size number of bytes available in buf
1109  * @param buf where the callee should write the connect message
1110  * @return number of bytes written to buf
1111  */
1112 static size_t
1113 send_callback (void *cls, size_t size, void *buf)
1114 {
1115   struct GNUNET_MESH_Handle *h = cls;
1116   struct GNUNET_MESH_TransmitHandle *th;
1117   struct GNUNET_MESH_TransmitHandle *next;
1118   struct GNUNET_MESH_Channel *ch;
1119   char *cbuf = buf;
1120   size_t tsize;
1121   size_t psize;
1122   size_t nsize;
1123
1124   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1125   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() Buffer %u\n", size);
1126   if ((0 == size) || (NULL == buf))
1127   {
1128     LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
1129     reconnect (h);
1130     h->th = NULL;
1131     return 0;
1132   }
1133   tsize = 0;
1134   next = h->th_head;
1135   nsize = message_ready_size (h);
1136   while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
1137   {
1138     ch = th->channel;
1139     if (GNUNET_YES == th_is_payload (th))
1140     {
1141       struct GNUNET_MESH_LocalData *dmsg;
1142       struct GNUNET_MessageHeader *mh;
1143
1144       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload\n");
1145       if (GNUNET_NO == ch->allow_send)
1146       {
1147         /* This channel is not ready to transmit yet, try next message */
1148         next = th->next;
1149         continue;
1150       }
1151       ch->packet_size = 0;
1152       GNUNET_assert (size >= th->size);
1153       dmsg = (struct GNUNET_MESH_LocalData *) cbuf;
1154       mh = (struct GNUNET_MessageHeader *) &dmsg[1];
1155       psize = th->notify (th->notify_cls,
1156                           size - sizeof (struct GNUNET_MESH_LocalData),
1157                           mh);
1158       if (psize > 0)
1159       {
1160         psize += sizeof (struct GNUNET_MESH_LocalData);
1161         GNUNET_assert (size >= psize);
1162         dmsg->header.size = htons (psize);
1163         dmsg->id = htonl (ch->chid);
1164         dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_DATA);
1165         LOG (GNUNET_ERROR_TYPE_DEBUG, "#  payload type %s\n",
1166              GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1167                 ch->allow_send = GNUNET_NO;
1168       }
1169       else
1170       {
1171         LOG (GNUNET_ERROR_TYPE_DEBUG,
1172              "#  callback returned size 0, "
1173              "application canceled transmission\n");
1174       }
1175     }
1176     else
1177     {
1178       struct GNUNET_MessageHeader *mh = (struct GNUNET_MessageHeader *) &th[1];
1179
1180       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  mesh internal traffic, type %s\n",
1181            GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
1182       memcpy (cbuf, &th[1], th->size);
1183       psize = th->size;
1184     }
1185     if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1186       GNUNET_SCHEDULER_cancel (th->timeout_task);
1187     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1188     GNUNET_free (th);
1189     next = h->th_head;
1190     nsize = message_ready_size (h);
1191     cbuf += psize;
1192     size -= psize;
1193     tsize += psize;
1194   }
1195   LOG (GNUNET_ERROR_TYPE_DEBUG, "#  total size: %u\n", tsize);
1196   h->th = NULL;
1197   size = message_ready_size (h);
1198   if (0 != size)
1199   {
1200     LOG (GNUNET_ERROR_TYPE_DEBUG, "#  next size: %u\n", size);
1201     h->th =
1202         GNUNET_CLIENT_notify_transmit_ready (h->client, size,
1203                                              GNUNET_TIME_UNIT_FOREVER_REL,
1204                                              GNUNET_YES, &send_callback, h);
1205   }
1206   else
1207   {
1208     if (NULL != h->th_head)
1209       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  can't transmit any more\n");
1210     else
1211       LOG (GNUNET_ERROR_TYPE_DEBUG, "#  nothing left to transmit\n");
1212   }
1213   if (GNUNET_NO == h->in_receive)
1214   {
1215     LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n");
1216     h->in_receive = GNUNET_YES;
1217     GNUNET_CLIENT_receive (h->client, &msg_received, h,
1218                            GNUNET_TIME_UNIT_FOREVER_REL);
1219   }
1220   LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() END\n");
1221   return tsize;
1222 }
1223
1224
1225 /**
1226  * Auxiliary function to send an already constructed packet to the service.
1227  * Takes care of creating a new queue element, copying the message and
1228  * calling the tmt_rdy function if necessary.
1229  *
1230  * @param h mesh handle
1231  * @param msg message to transmit
1232  * @param channel channel this send is related to (NULL if N/A)
1233  */
1234 static void
1235 send_packet (struct GNUNET_MESH_Handle *h,
1236              const struct GNUNET_MessageHeader *msg,
1237              struct GNUNET_MESH_Channel *channel)
1238 {
1239   struct GNUNET_MESH_TransmitHandle *th;
1240   size_t msize;
1241
1242   LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
1243        GNUNET_MESH_DEBUG_M2S(ntohs(msg->type)));
1244   msize = ntohs (msg->size);
1245   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle) + msize);
1246   th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1247   th->size = msize;
1248   th->channel = channel;
1249   memcpy (&th[1], msg, msize);
1250   add_to_queue (h, th);
1251   LOG (GNUNET_ERROR_TYPE_DEBUG, "  queued\n");
1252   if (NULL != h->th)
1253     return;
1254   LOG (GNUNET_ERROR_TYPE_DEBUG, "  calling ntfy tmt rdy for %u bytes\n", msize);
1255   h->th =
1256       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
1257                                            GNUNET_TIME_UNIT_FOREVER_REL,
1258                                            GNUNET_YES, &send_callback, h);
1259 }
1260
1261
1262 /******************************************************************************/
1263 /**********************      API CALL DEFINITIONS     *************************/
1264 /******************************************************************************/
1265
1266 struct GNUNET_MESH_Handle *
1267 GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
1268                      GNUNET_MESH_InboundChannelNotificationHandler new_channel,
1269                      GNUNET_MESH_ChannelEndHandler cleaner,
1270                      const struct GNUNET_MESH_MessageHandler *handlers,
1271                      const uint32_t *ports)
1272 {
1273   struct GNUNET_MESH_Handle *h;
1274
1275   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect()\n");
1276   h = GNUNET_malloc (sizeof (struct GNUNET_MESH_Handle));
1277   LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
1278   h->cfg = cfg;
1279   h->new_channel = new_channel;
1280   h->cleaner = cleaner;
1281   h->client = GNUNET_CLIENT_connect ("mesh", cfg);
1282   if (h->client == NULL)
1283   {
1284     GNUNET_break (0);
1285     GNUNET_free (h);
1286     return NULL;
1287   }
1288   h->cls = cls;
1289   h->message_handlers = handlers;
1290   h->ports = ports;
1291   h->next_chid = GNUNET_MESH_LOCAL_CHANNEL_ID_CLI;
1292   h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1293   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1294
1295   if (NULL != ports && ports[0] != 0 && NULL == new_channel)
1296   {
1297     GNUNET_break (0);
1298     LOG (GNUNET_ERROR_TYPE_DEBUG,
1299          "no new channel handler given, ports parameter is useless!!\n");
1300   }
1301   if ((NULL == ports || ports[0] == 0) && NULL != new_channel)
1302   {
1303     GNUNET_break (0);
1304     LOG (GNUNET_ERROR_TYPE_DEBUG,
1305          "no ports given, new channel handler will never be called!!\n");
1306   }
1307   /* count handlers */
1308   for (h->n_handlers = 0;
1309        handlers && handlers[h->n_handlers].type;
1310        h->n_handlers++) ;
1311   for (h->n_ports = 0;
1312        ports && ports[h->n_ports];
1313        h->n_ports++) ;
1314   send_connect (h);
1315   LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_MESH_connect() END\n");
1316   return h;
1317 }
1318
1319
1320 void
1321 GNUNET_MESH_disconnect (struct GNUNET_MESH_Handle *handle)
1322 {
1323   struct GNUNET_MESH_Channel *ch;
1324   struct GNUNET_MESH_Channel *aux;
1325   struct GNUNET_MESH_TransmitHandle *th;
1326
1327   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH DISCONNECT\n");
1328
1329   ch = handle->channels_head;
1330   while (NULL != ch)
1331   {
1332     aux = ch->next;
1333     if (ch->chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
1334     {
1335       GNUNET_break (0);
1336       LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X not destroyed\n", ch->chid);
1337     }
1338     destroy_channel (ch, GNUNET_YES);
1339     ch = aux;
1340   }
1341   while ( (th = handle->th_head) != NULL)
1342   {
1343     struct GNUNET_MessageHeader *msg;
1344
1345     /* Make sure it is an allowed packet (everything else should have been
1346      * already canceled).
1347      */
1348     GNUNET_break (GNUNET_NO == th_is_payload (th));
1349     msg = (struct GNUNET_MessageHeader *) &th[1];
1350     switch (ntohs(msg->type))
1351     {
1352       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT:
1353       case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
1354       case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY:
1355       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNELS:
1356       case GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNEL:
1357         break;
1358       default:
1359         GNUNET_break (0);
1360         LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected msg %u\n",
1361              ntohs(msg->type));
1362     }
1363
1364     GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th);
1365     GNUNET_free (th);
1366   }
1367
1368   if (NULL != handle->th)
1369   {
1370     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1371     handle->th = NULL;
1372   }
1373   if (NULL != handle->client)
1374   {
1375     GNUNET_CLIENT_disconnect (handle->client);
1376     handle->client = NULL;
1377   }
1378   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1379   {
1380     GNUNET_SCHEDULER_cancel(handle->reconnect_task);
1381     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1382   }
1383   GNUNET_free (handle);
1384 }
1385
1386
1387 /**
1388  * Create a new channel towards a remote peer.
1389  *
1390  * If the destination port is not open by any peer or the destination peer
1391  * does not accept the channel, #GNUNET_MESH_ChannelEndHandler will be called
1392  * for this channel.
1393  *
1394  * @param h mesh handle
1395  * @param channel_ctx client's channel context to associate with the channel
1396  * @param peer peer identity the channel should go to
1397  * @param port Port number.
1398  * @param nobuffer Flag for disabling buffering on relay nodes.
1399  * @param reliable Flag for end-to-end reliability.
1400  *
1401  * @return handle to the channel
1402  */
1403 struct GNUNET_MESH_Channel *
1404 GNUNET_MESH_channel_create (struct GNUNET_MESH_Handle *h,
1405                            void *channel_ctx,
1406                            const struct GNUNET_PeerIdentity *peer,
1407                            uint32_t port,
1408                            int nobuffer,
1409                            int reliable)
1410 {
1411   struct GNUNET_MESH_Channel *ch;
1412   struct GNUNET_MESH_ChannelMessage msg;
1413
1414   LOG (GNUNET_ERROR_TYPE_DEBUG,
1415        "Creating new channel to %s:%u\n",
1416        GNUNET_i2s (peer), port);
1417   ch = create_channel (h, 0);
1418   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", ch);
1419   LOG (GNUNET_ERROR_TYPE_DEBUG, "  number %X\n", ch->chid);
1420   ch->ctx = channel_ctx;
1421   ch->peer = GNUNET_PEER_intern (peer);
1422   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE);
1423   msg.header.size = htons (sizeof (struct GNUNET_MESH_ChannelMessage));
1424   msg.channel_id = htonl (ch->chid);
1425   msg.port = htonl (port);
1426   msg.peer = *peer;
1427   msg.opt = 0;
1428   if (GNUNET_YES == reliable)
1429     msg.opt |= GNUNET_MESH_OPTION_RELIABLE;
1430   if (GNUNET_YES == nobuffer)
1431     msg.opt |= GNUNET_MESH_OPTION_NOBUFFER;
1432   msg.opt = htonl (msg.opt);
1433   ch->allow_send = 0;
1434   send_packet (h, &msg.header, ch);
1435   return ch;
1436 }
1437
1438
1439 void
1440 GNUNET_MESH_channel_destroy (struct GNUNET_MESH_Channel *channel)
1441 {
1442   struct GNUNET_MESH_Handle *h;
1443   struct GNUNET_MESH_ChannelMessage msg;
1444   struct GNUNET_MESH_TransmitHandle *th;
1445
1446   LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n");
1447   h = channel->mesh;
1448
1449   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY);
1450   msg.header.size = htons (sizeof (struct GNUNET_MESH_ChannelMessage));
1451   msg.channel_id = htonl (channel->chid);
1452   memset (&msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
1453   msg.port = 0;
1454   msg.opt = 0;
1455   th = h->th_head;
1456   while (th != NULL)
1457   {
1458     struct GNUNET_MESH_TransmitHandle *aux;
1459     if (th->channel == channel)
1460     {
1461       aux = th->next;
1462       /* FIXME call the handler? */
1463       if (GNUNET_YES == th_is_payload (th))
1464         th->notify (th->notify_cls, 0, NULL);
1465       GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1466       GNUNET_free (th);
1467       th = aux;
1468     }
1469     else
1470       th = th->next;
1471   }
1472
1473   destroy_channel (channel, GNUNET_YES);
1474   send_packet (h, &msg.header, NULL);
1475 }
1476
1477
1478 /**
1479  * Get information about a channel.
1480  *
1481  * @param channel Channel handle.
1482  * @param option Query (GNUNET_MESH_OPTION_*).
1483  * @param ... dependant on option, currently not used
1484  *
1485  * @return Union with an answer to the query.
1486  */
1487 const union GNUNET_MESH_ChannelInfo *
1488 GNUNET_MESH_channel_get_info (struct GNUNET_MESH_Channel *channel,
1489                              enum MeshOption option, ...)
1490 {
1491   const union GNUNET_MESH_ChannelInfo *ret;
1492
1493   switch (option)
1494   {
1495     case GNUNET_MESH_OPTION_NOBUFFER:
1496       ret = (const union GNUNET_MESH_ChannelInfo *) &channel->nobuffer;
1497       break;
1498     case GNUNET_MESH_OPTION_RELIABLE:
1499       ret = (const union GNUNET_MESH_ChannelInfo *) &channel->reliable;
1500       break;
1501     case GNUNET_MESH_OPTION_OOORDER:
1502       ret = (const union GNUNET_MESH_ChannelInfo *) &channel->ooorder;
1503       break;
1504     case GNUNET_MESH_OPTION_PEER:
1505       ret = (const union GNUNET_MESH_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer);
1506       break;
1507     default:
1508       GNUNET_break (0);
1509       return NULL;
1510   }
1511
1512   return ret;
1513 }
1514
1515 struct GNUNET_MESH_TransmitHandle *
1516 GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Channel *channel, int cork,
1517                                    struct GNUNET_TIME_Relative maxdelay,
1518                                    size_t notify_size,
1519                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1520                                    void *notify_cls)
1521 {
1522   struct GNUNET_MESH_TransmitHandle *th;
1523
1524   GNUNET_assert (NULL != channel);
1525   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY\n");
1526   LOG (GNUNET_ERROR_TYPE_DEBUG, "    on channel %X\n", channel->chid);
1527   LOG (GNUNET_ERROR_TYPE_DEBUG, "    allow_send %d\n", channel->allow_send);
1528   if (channel->chid >= GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
1529     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to origin\n");
1530   else
1531     LOG (GNUNET_ERROR_TYPE_DEBUG, "    to destination\n");
1532   LOG (GNUNET_ERROR_TYPE_DEBUG, "    payload size %u\n", notify_size);
1533   GNUNET_assert (NULL != notify);
1534   GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed
1535   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle));
1536   th->channel = channel;
1537   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1538   th->size = notify_size + sizeof (struct GNUNET_MESH_LocalData);
1539   channel->packet_size = th->size;
1540   LOG (GNUNET_ERROR_TYPE_DEBUG, "    total size %u\n", th->size);
1541   th->notify = notify;
1542   th->notify_cls = notify_cls;
1543   add_to_queue (channel->mesh, th);
1544   if (NULL != channel->mesh->th)
1545     return th;
1546   if (GNUNET_NO == channel->allow_send)
1547     return th;
1548   LOG (GNUNET_ERROR_TYPE_DEBUG, "    call client notify tmt rdy\n");
1549   channel->mesh->th =
1550       GNUNET_CLIENT_notify_transmit_ready (channel->mesh->client, th->size,
1551                                            GNUNET_TIME_UNIT_FOREVER_REL,
1552                                            GNUNET_YES, &send_callback,
1553                                            channel->mesh);
1554   LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY END\n");
1555   return th;
1556 }
1557
1558
1559 void
1560 GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th)
1561 {
1562   struct GNUNET_MESH_Handle *mesh;
1563
1564   th->channel->packet_size = 0;
1565   mesh = th->channel->mesh;
1566   if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1567     GNUNET_SCHEDULER_cancel (th->timeout_task);
1568   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
1569   GNUNET_free (th);
1570   if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
1571   {
1572     /* queue empty, no point in asking for transmission */
1573     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
1574     mesh->th = NULL;
1575   }
1576 }
1577
1578
1579 void
1580 GNUNET_MESH_receive_done (struct GNUNET_MESH_Channel *channel)
1581 {
1582   send_ack (channel);
1583 }
1584
1585
1586 /**
1587  * Request information about the running mesh peer.
1588  * The callback will be called for every channel known to the service,
1589  * listing all active peers that blong to the channel.
1590  *
1591  * If called again on the same handle, it will overwrite the previous
1592  * callback and cls. To retrieve the cls, monitor_cancel must be
1593  * called first.
1594  *
1595  * WARNING: unstable API, likely to change in the future!
1596  *
1597  * @param h Handle to the mesh peer.
1598  * @param callback Function to call with the requested data.
1599  * @param callback_cls Closure for @c callback.
1600  */
1601 void
1602 GNUNET_MESH_get_channels (struct GNUNET_MESH_Handle *h,
1603                          GNUNET_MESH_ChannelsCB callback,
1604                          void *callback_cls)
1605 {
1606   struct GNUNET_MessageHeader msg;
1607
1608   msg.size = htons (sizeof (msg));
1609   msg.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNELS);
1610   send_packet (h, &msg, NULL);
1611   h->channels_cb = callback;
1612   h->channels_cls = callback_cls;
1613 }
1614
1615
1616 /**
1617  * Cancel a monitor request. The monitor callback will not be called.
1618  *
1619  * @param h Mesh handle.
1620  *
1621  * @return Closure given to GNUNET_MESH_monitor, if any.
1622  */
1623 void *
1624 GNUNET_MESH_get_channels_cancel (struct GNUNET_MESH_Handle *h)
1625 {
1626   void *cls;
1627
1628   cls = h->channels_cls;
1629   h->channels_cb = NULL;
1630   h->channels_cls = NULL;
1631   return cls;
1632 }
1633
1634
1635 /**
1636  * Request information about a specific channel of the running mesh peer.
1637  *
1638  * WARNING: unstable API, likely to change in the future!
1639  * FIXME Add destination option.
1640  *
1641  * @param h Handle to the mesh peer.
1642  * @param initiator ID of the owner of the channel.
1643  * @param channel_number Channel number.
1644  * @param callback Function to call with the requested data.
1645  * @param callback_cls Closure for @c callback.
1646  */
1647 void
1648 GNUNET_MESH_show_channel (struct GNUNET_MESH_Handle *h,
1649                          struct GNUNET_PeerIdentity *initiator,
1650                          unsigned int channel_number,
1651                          GNUNET_MESH_ChannelCB callback,
1652                          void *callback_cls)
1653 {
1654   struct GNUNET_MESH_LocalMonitor msg;
1655
1656   msg.header.size = htons (sizeof (msg));
1657   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_CHANNEL);
1658   msg.owner = *initiator;
1659   msg.channel_id = htonl (channel_number);
1660   msg.reserved = 0;
1661   send_packet (h, &msg.header, NULL);
1662   h->channel_cb = callback;
1663   h->channel_cls = callback_cls;
1664 }
1665
1666
1667 /**
1668  * Function called to notify a client about the connection
1669  * begin ready to queue more data.  "buf" will be
1670  * NULL and "size" zero if the connection was closed for
1671  * writing in the meantime.
1672  *
1673  * @param cls closure
1674  * @param size number of bytes available in buf
1675  * @param buf where the callee should write the message
1676  * @return number of bytes written to buf
1677  */
1678 static size_t
1679 mesh_mq_ntr (void *cls, size_t size,
1680              void *buf)
1681 {
1682   struct GNUNET_MQ_Handle *mq = cls;
1683   struct MeshMQState *state = GNUNET_MQ_impl_state (mq);
1684   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
1685   uint16_t msize;
1686
1687   state->th = NULL;
1688   if (NULL == buf)
1689   {
1690     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
1691     return 0;
1692   }
1693   msize = ntohs (msg->size);
1694   GNUNET_assert (msize <= size);
1695   memcpy (buf, msg, msize);
1696   GNUNET_MQ_impl_send_continue (mq);
1697   return msize;
1698 }
1699
1700
1701 /**
1702  * Signature of functions implementing the
1703  * sending functionality of a message queue.
1704  *
1705  * @param mq the message queue
1706  * @param msg the message to send
1707  * @param impl_state state of the implementation
1708  */
1709 static void
1710 mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq,
1711                    const struct GNUNET_MessageHeader *msg, void *impl_state)
1712 {
1713   struct MeshMQState *state = impl_state;
1714
1715   GNUNET_assert (NULL == state->th);
1716   state->th =
1717       GNUNET_MESH_notify_transmit_ready (state->channel,
1718                                          /* FIXME: add option for corking */
1719                                          GNUNET_NO,
1720                                          GNUNET_TIME_UNIT_FOREVER_REL,
1721                                          ntohs (msg->size),
1722                                          mesh_mq_ntr, mq);
1723
1724 }
1725
1726
1727 /**
1728  * Signature of functions implementing the
1729  * destruction of a message queue.
1730  * Implementations must not free 'mq', but should
1731  * take care of 'impl_state'.
1732  *
1733  * @param mq the message queue to destroy
1734  * @param impl_state state of the implementation
1735  */
1736 static void
1737 mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
1738 {
1739   struct MeshMQState *state = impl_state;
1740
1741   if (NULL != state->th)
1742     GNUNET_MESH_notify_transmit_ready_cancel (state->th);
1743
1744   GNUNET_free (state);
1745 }
1746
1747
1748 /**
1749  * Create a message queue for a mesh channel.
1750  * The message queue can only be used to transmit messages,
1751  * not to receive them.
1752  *
1753  * @param channel the channel to create the message qeue for
1754  * @return a message queue to messages over the channel
1755  */
1756 struct GNUNET_MQ_Handle *
1757 GNUNET_MESH_mq_create (struct GNUNET_MESH_Channel *channel)
1758 {
1759   struct GNUNET_MQ_Handle *mq;
1760   struct MeshMQState *state;
1761
1762   state = GNUNET_new (struct MeshMQState);
1763   state->channel = channel;
1764
1765   mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl,
1766                                       mesh_mq_destroy_impl,
1767                                       NULL, /* FIXME: cancel impl. */
1768                                       state,
1769                                       NULL, /* no msg handlers */
1770                                       NULL, /* no err handlers */
1771                                       NULL); /* no handler cls */
1772   return mq;
1773 }
1774