- fix main mesh file
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_peer.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21
22 #include "platform.h"
23 #include "gnunet_util_lib.h"
24
25 #include "gnunet-service-mesh_peer.h"
26 #include "gnunet-service-mesh_dht.h"
27 #include "gnunet-service-mesh_connection.h"
28 #include "mesh_path.h"
29
30 /******************************************************************************/
31 /********************************   STRUCTS  **********************************/
32 /******************************************************************************/
33
34 /**
35  * Struct containing all information regarding a given peer
36  */
37 struct MeshPeer
38 {
39     /**
40      * ID of the peer
41      */
42   GNUNET_PEER_Id id;
43
44     /**
45      * Last time we heard from this peer
46      */
47   struct GNUNET_TIME_Absolute last_contact;
48
49     /**
50      * Paths to reach the peer, ordered by ascending hop count
51      */
52   struct MeshPeerPath *path_head;
53
54     /**
55      * Paths to reach the peer, ordered by ascending hop count
56      */
57   struct MeshPeerPath *path_tail;
58
59     /**
60      * Handle to stop the DHT search for paths to this peer
61      */
62   struct GMD_search_handle *search_h;
63
64     /**
65      * Tunnel to this peer, if any.
66      */
67   struct MeshTunnel2 *tunnel;
68
69     /**
70      * Connections that go through this peer, indexed by tid;
71      */
72   struct GNUNET_CONTAINER_MultiHashMap *connections;
73
74     /**
75      * Handle for queued transmissions
76      */
77   struct GNUNET_CORE_TransmitHandle *core_transmit;
78
79   /**
80    * Transmission queue to core DLL head
81    */
82   struct MeshPeerQueue *queue_head;
83
84   /**
85    * Transmission queue to core DLL tail
86    */
87   struct MeshPeerQueue *queue_tail;
88
89   /**
90    * How many messages are in the queue to this peer.
91    */
92   unsigned int queue_n;
93 };
94
95
96 /******************************************************************************/
97 /*******************************   GLOBALS  ***********************************/
98 /******************************************************************************/
99
100 /**
101  * Peers known, indexed by PeerIdentity (MeshPeer).
102  */
103 static struct GNUNET_CONTAINER_MultiPeerMap *peers;
104
105 /**
106  * How many peers do we want to remember?
107  */
108 static unsigned long long max_peers;
109
110 /**
111  * Percentage of messages that will be dropped (for test purposes only).
112  */
113 static unsigned long long drop_percent;
114
115 /******************************************************************************/
116 /***************************** CORE CALLBACKS *********************************/
117 /******************************************************************************/
118
119
120 /**
121   * Core callback to write a pre-constructed data packet to core buffer
122   *
123   * @param cls Closure (MeshTransmissionDescriptor with data in "data" member).
124   * @param size Number of bytes available in buf.
125   * @param buf Where the to write the message.
126   *
127   * @return number of bytes written to buf
128   */
129 static size_t
130 send_core_data_raw (void *cls, size_t size, void *buf)
131 {
132   struct GNUNET_MessageHeader *msg = cls;
133   size_t total_size;
134
135   GNUNET_assert (NULL != msg);
136   total_size = ntohs (msg->size);
137
138   if (total_size > size)
139   {
140     GNUNET_break (0);
141     return 0;
142   }
143   memcpy (buf, msg, total_size);
144   GNUNET_free (cls);
145   return total_size;
146 }
147
148
149 /**
150  * Function to send a create connection message to a peer.
151  *
152  * @param c Connection to create.
153  * @param size number of bytes available in buf
154  * @param buf where the callee should write the message
155  * @return number of bytes written to buf
156  */
157 static size_t
158 send_core_connection_create (struct MeshConnection *c, size_t size, void *buf)
159 {
160   struct GNUNET_MESH_ConnectionCreate *msg;
161   struct GNUNET_PeerIdentity *peer_ptr;
162   struct MeshPeerPath *p = c->path;
163   size_t size_needed;
164   int i;
165
166   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n");
167   size_needed =
168       sizeof (struct GNUNET_MESH_ConnectionCreate) +
169       p->length * sizeof (struct GNUNET_PeerIdentity);
170
171   if (size < size_needed || NULL == buf)
172   {
173     GNUNET_break (0);
174     return 0;
175   }
176   msg = (struct GNUNET_MESH_ConnectionCreate *) buf;
177   msg->header.size = htons (size_needed);
178   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE);
179   msg->cid = c->id;
180
181   peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1];
182   for (i = 0; i < p->length; i++)
183   {
184     GNUNET_PEER_resolve (p->peers[i], peer_ptr++);
185   }
186
187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
188               "CONNECTION CREATE (%u bytes long) sent!\n", size_needed);
189   return size_needed;
190 }
191
192
193 /**
194  * Creates a path ack message in buf and frees all unused resources.
195  *
196  * @param c Connection to send an ACK on.
197  * @param size number of bytes available in buf
198  * @param buf where the callee should write the message
199  *
200  * @return number of bytes written to buf
201  */
202 static size_t
203 send_core_connection_ack (struct MeshConnection *c, size_t size, void *buf)
204 {
205   struct GNUNET_MESH_ConnectionACK *msg = buf;
206   struct MeshTunnel2 *t = c->t;
207
208   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n");
209   GNUNET_assert (NULL != t);
210   if (sizeof (struct GNUNET_MESH_ConnectionACK) > size)
211   {
212     GNUNET_break (0);
213     return 0;
214   }
215   msg->header.size = htons (sizeof (struct GNUNET_MESH_ConnectionACK));
216   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK);
217   msg->cid = c->id;
218   msg->reserved = 0;
219
220   /* TODO add signature */
221
222   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n");
223   return sizeof (struct GNUNET_MESH_ConnectionACK);
224 }
225
226
227 /******************************************************************************/
228 /********************************   STATIC  ***********************************/
229 /******************************************************************************/
230
231 /**
232  * Iterator over tunnel hash map entries to destroy the tunnel during shutdown.
233  *
234  * @param cls closure
235  * @param key current key code
236  * @param value value in the hash map
237  * @return #GNUNET_YES if we should continue to iterate,
238  *         #GNUNET_NO if not.
239  */
240 static int
241 shutdown_tunnel (void *cls,
242                  const struct GNUNET_PeerIdentity *key,
243                  void *value)
244 {
245   struct MeshPeer *p = value;
246   struct MeshTunnel2 *t = p->tunnel;
247
248   if (NULL != t)
249     GMT_destroy (t);
250   return GNUNET_YES;
251 }
252
253
254
255 /**
256  * Destroy the peer_info and free any allocated resources linked to it
257  *
258  * @param peer The peer_info to destroy.
259  *
260  * @return GNUNET_OK on success
261  */
262 static int
263 peer_destroy (struct MeshPeer *peer)
264 {
265   struct GNUNET_PeerIdentity id;
266   struct MeshPeerPath *p;
267   struct MeshPeerPath *nextp;
268
269   GNUNET_PEER_resolve (peer->id, &id);
270   GNUNET_PEER_change_rc (peer->id, -1);
271
272   if (GNUNET_YES !=
273     GNUNET_CONTAINER_multipeermap_remove (peers, &id, peer))
274   {
275     GNUNET_break (0);
276     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
277                 "removing peer %s, not in peermap\n", GNUNET_i2s (&id));
278   }
279     if (NULL != peer->search_h)
280     {
281       GMD_search_stop (peer->search_h);
282     }
283       p = peer->path_head;
284       while (NULL != p)
285       {
286         nextp = p->next;
287         GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p);
288         path_destroy (p);
289         p = nextp;
290       }
291         tunnel_destroy_empty (peer->tunnel);
292         GNUNET_free (peer);
293         return GNUNET_OK;
294 }
295
296
297 /**
298  * Returns if peer is used (has a tunnel, is neighbor).
299  *
300  * @peer Peer to check.
301  *
302  * @return GNUNET_YES if peer is in use.
303  */
304 static int
305 peer_is_used (struct MeshPeer *peer)
306 {
307   struct MeshPeerPath *p;
308
309   if (NULL != peer->tunnel)
310     return GNUNET_YES;
311
312   for (p = peer->path_head; NULL != p; p = p->next)
313   {
314     if (p->length < 3)
315       return GNUNET_YES;
316   }
317     return GNUNET_NO;
318 }
319
320
321 /**
322  * Iterator over all the peers to get the oldest timestamp.
323  *
324  * @param cls Closure (unsued).
325  * @param key ID of the peer.
326  * @param value Peer_Info of the peer.
327  */
328 static int
329 peer_get_oldest (void *cls,
330                  const struct GNUNET_PeerIdentity *key,
331                  void *value)
332 {
333   struct MeshPeer *p = value;
334   struct GNUNET_TIME_Absolute *abs = cls;
335
336   /* Don't count active peers */
337   if (GNUNET_YES == peer_is_used (p))
338     return GNUNET_YES;
339
340   if (abs->abs_value_us < p->last_contact.abs_value_us)
341     abs->abs_value_us = p->last_contact.abs_value_us;
342
343   return GNUNET_YES;
344 }
345
346
347 /**
348  * Iterator over all the peers to remove the oldest entry.
349  *
350  * @param cls Closure (unsued).
351  * @param key ID of the peer.
352  * @param value Peer_Info of the peer.
353  */
354 static int
355 peer_timeout (void *cls,
356               const struct GNUNET_PeerIdentity *key,
357               void *value)
358 {
359   struct MeshPeer *p = value;
360   struct GNUNET_TIME_Absolute *abs = cls;
361
362   if (p->last_contact.abs_value_us == abs->abs_value_us &&
363     GNUNET_NO == peer_is_used (p))
364   {
365     peer_destroy (p);
366     return GNUNET_NO;
367   }
368     return GNUNET_YES;
369 }
370
371
372 /**
373  * Delete oldest unused peer.
374  */
375 static void
376 peer_delete_oldest (void)
377 {
378   struct GNUNET_TIME_Absolute abs;
379
380   abs = GNUNET_TIME_UNIT_FOREVER_ABS;
381
382   GNUNET_CONTAINER_multipeermap_iterate (peers,
383                                          &peer_get_oldest,
384                                          &abs);
385   GNUNET_CONTAINER_multipeermap_iterate (peers,
386                                          &peer_timeout,
387                                          &abs);
388 }
389
390
391 /**
392  * Retrieve the MeshPeer stucture associated with the peer, create one
393  * and insert it in the appropriate structures if the peer is not known yet.
394  *
395  * @param peer Full identity of the peer.
396  *
397  * @return Existing or newly created peer info.
398  */
399 static struct MeshPeer *
400 peer_get (const struct GNUNET_PeerIdentity *peer_id)
401 {
402   struct MeshPeer *peer;
403
404   peer = GNUNET_CONTAINER_multipeermap_get (peers, peer_id);
405   if (NULL == peer)
406   {
407     peer = GNUNET_new (struct MeshPeer);
408     if (GNUNET_CONTAINER_multipeermap_size (peers) > max_peers)
409     {
410       peer_delete_oldest ();
411     }
412         GNUNET_CONTAINER_multipeermap_put (peers, peer_id, peer,
413                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
414         peer->id = GNUNET_PEER_intern (peer_id);
415   }
416     peer->last_contact = GNUNET_TIME_absolute_get();
417
418     return peer;
419 }
420
421
422 /**
423  * Retrieve the MeshPeer stucture associated with the peer, create one
424  * and insert it in the appropriate structures if the peer is not known yet.
425  *
426  * @param peer Short identity of the peer.
427  *
428  * @return Existing or newly created peer info.
429  */
430 static struct MeshPeer *
431 peer_get_short (const GNUNET_PEER_Id peer)
432 {
433   return peer_get (GNUNET_PEER_resolve2 (peer));
434 }
435
436
437 /**
438  * Get a cost of a path for a peer considering existing tunnel connections.
439  *
440  * @param peer Peer towards which the path is considered.
441  * @param path Candidate path.
442  *
443  * @return Cost of the path (path length + number of overlapping nodes)
444  */
445 static unsigned int
446 peer_get_path_cost (const struct MeshPeer *peer,
447                     const struct MeshPeerPath *path)
448 {
449   struct MeshConnection *c;
450   unsigned int overlap;
451   unsigned int i;
452   unsigned int j;
453
454   if (NULL == path)
455     return 0;
456
457   overlap = 0;
458   GNUNET_assert (NULL != peer->tunnel);
459
460   for (i = 0; i < path->length; i++)
461   {
462     for (c = peer->tunnel->connection_head; NULL != c; c = c->next)
463     {
464       for (j = 0; j < c->path->length; j++)
465       {
466         if (path->peers[i] == c->path->peers[j])
467         {
468           overlap++;
469           break;
470         }
471       }
472     }
473   }
474   return (path->length + overlap) * (path->score * -1);
475 }
476
477
478 /**
479  * Choose the best path towards a peer considering the tunnel properties.
480  *
481  * @param peer The destination peer.
482  *
483  * @return Best current known path towards the peer, if any.
484  */
485 static struct MeshPeerPath *
486 peer_get_best_path (const struct MeshPeer *peer)
487 {
488   struct MeshPeerPath *best_p;
489   struct MeshPeerPath *p;
490   struct MeshConnection *c;
491   unsigned int best_cost;
492   unsigned int cost;
493
494   best_cost = UINT_MAX;
495   best_p = NULL;
496   for (p = peer->path_head; NULL != p; p = p->next)
497   {
498     for (c = peer->tunnel->connection_head; NULL != c; c = c->next)
499       if (c->path == p)
500         break;
501       if (NULL != c)
502         continue; /* If path is in use in a connection, skip it. */
503
504             if ((cost = peer_get_path_cost (peer, p)) < best_cost)
505             {
506               best_cost = cost;
507               best_p = p;
508             }
509   }
510     return best_p;
511 }
512
513
514 /**
515  * Add the path to the peer and update the path used to reach it in case this
516  * is the shortest.
517  *
518  * @param peer_info Destination peer to add the path to.
519  * @param path New path to add. Last peer must be the peer in arg 1.
520  *             Path will be either used of freed if already known.
521  * @param trusted Do we trust that this path is real?
522  */
523 void
524 peer_add_path (struct MeshPeer *peer_info, struct MeshPeerPath *path,
525                int trusted)
526 {
527   struct MeshPeerPath *aux;
528   unsigned int l;
529   unsigned int l2;
530
531   if ((NULL == peer_info) || (NULL == path))
532   {
533     GNUNET_break (0);
534     path_destroy (path);
535     return;
536   }
537     if (path->peers[path->length - 1] != peer_info->id)
538     {
539       GNUNET_break (0);
540       path_destroy (path);
541       return;
542     }
543       if (2 >= path->length && GNUNET_NO == trusted)
544       {
545         /* Only allow CORE to tell us about direct paths */
546         path_destroy (path);
547         return;
548       }
549         for (l = 1; l < path->length; l++)
550         {
551           if (path->peers[l] == myid)
552           {
553             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "shortening path by %u\n", l);
554             for (l2 = 0; l2 < path->length - l; l2++)
555             {
556               path->peers[l2] = path->peers[l + l2];
557             }
558                   path->length -= l;
559                   l = 1;
560                   path->peers =
561                             GNUNET_realloc (path->peers, path->length * sizeof (GNUNET_PEER_Id));
562           }
563         }
564
565           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding path [%u] to peer %s\n",
566                       path->length, peer2s (peer_info));
567
568           l = path_get_length (path);
569           if (0 == l)
570           {
571             path_destroy (path);
572             return;
573           }
574
575             GNUNET_assert (peer_info->id == path->peers[path->length - 1]);
576             for (aux = peer_info->path_head; aux != NULL; aux = aux->next)
577             {
578               l2 = path_get_length (aux);
579               if (l2 > l)
580               {
581                 GNUNET_CONTAINER_DLL_insert_before (peer_info->path_head,
582                                                     peer_info->path_tail, aux, path);
583                 return;
584               }
585                   else
586                   {
587                     if (l2 == l && memcmp (path->peers, aux->peers, l) == 0)
588                     {
589                       path_destroy (path);
590                       return;
591                     }
592                   }
593             }
594               GNUNET_CONTAINER_DLL_insert_tail (peer_info->path_head, peer_info->path_tail,
595                                                 path);
596               return;
597 }
598
599
600 /**
601  * Add the path to the origin peer and update the path used to reach it in case
602  * this is the shortest.
603  * The path is given in peer_info -> destination, therefore we turn the path
604  * upside down first.
605  *
606  * @param peer_info Peer to add the path to, being the origin of the path.
607  * @param path New path to add after being inversed.
608  *             Path will be either used or freed.
609  * @param trusted Do we trust that this path is real?
610  */
611 static void
612 peer_add_path_to_origin (struct MeshPeer *peer_info,
613                          struct MeshPeerPath *path, int trusted)
614 {
615   if (NULL == path)
616     return;
617   path_invert (path);
618   peer_add_path (peer_info, path, trusted);
619 }
620
621
622 /**
623  * Adds a path to the peer_infos of all the peers in the path
624  *
625  * @param p Path to process.
626  * @param confirmed Whether we know if the path works or not.
627  */
628 static void
629 path_add_to_peers (struct MeshPeerPath *p, int confirmed)
630 {
631   unsigned int i;
632
633   /* TODO: invert and add */
634   for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ;
635   for (i++; i < p->length; i++)
636   {
637     struct MeshPeer *aux;
638     struct MeshPeerPath *copy;
639     
640     aux = peer_get_short (p->peers[i]);
641     copy = path_duplicate (p);
642     copy->length = i + 1;
643     peer_add_path (aux, copy, p->length < 3 ? GNUNET_NO : confirmed);
644   }
645 }
646
647
648 /**
649  * Function to process paths received for a new peer addition. The recorded
650  * paths form the initial tunnel, which can be optimized later.
651  * Called on each result obtained for the DHT search.
652  *
653  * @param cls closure
654  * @param path
655  */
656 static void
657 search_handler (void *cls, struct MeshPeerPath *path)
658 {
659   struct MeshPeer *peer = cls;
660   unsigned int connection_count;
661
662   path_add_to_peers (path, GNUNET_NO);
663
664   /* Count connections */
665   connection_count = GMC_count (peer->tunnel->connection_head);
666
667   /* If we already have 3 (or more (?!)) connections, it's enough */
668   if (3 <= connection_count)
669     return;
670
671   if (peer->tunnel->state == MESH_TUNNEL_SEARCHING)
672   {
673     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ... connect!\n");
674     GMP_connect (peer);
675   }
676   return;
677 }
678
679
680 /**
681  * Core callback to write a queued packet to core buffer
682  *
683  * @param cls Closure (peer info).
684  * @param size Number of bytes available in buf.
685  * @param buf Where the to write the message.
686  *
687  * @return number of bytes written to buf
688  */
689 static size_t
690 queue_send (void *cls, size_t size, void *buf)
691 {
692   struct MeshPeer *peer = cls;
693   struct MeshFlowControl *fc;
694   struct MeshConnection *c;
695   struct GNUNET_MessageHeader *msg;
696   struct MeshPeerQueue *queue;
697   struct MeshTunnel2 *t;
698   struct MeshChannel *ch;
699   const struct GNUNET_PeerIdentity *dst_id;
700   size_t data_size;
701   uint32_t pid;
702   uint16_t type;
703   int fwd;
704
705   peer->core_transmit = NULL;
706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send (max %u)\n", size);
707
708   if (NULL == buf || 0 == size)
709   {
710     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Buffer size 0.\n");
711     return 0;
712   }
713
714   /* Initialize */
715   queue = peer_get_first_message (peer);
716   if (NULL == queue)
717   {
718     GNUNET_break (0); /* Core tmt_rdy should've been canceled */
719     return 0;
720   }
721   c = queue->c;
722   fwd = queue->fwd;
723   fc = fwd ? &c->fwd_fc : &c->bck_fc;
724
725   dst_id = GNUNET_PEER_resolve2 (peer->id);
726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   towards %s\n", GNUNET_i2s (dst_id));
727   /* Check if buffer size is enough for the message */
728   if (queue->size > size)
729   {
730       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   not enough room, reissue\n");
731       peer->core_transmit =
732           GNUNET_CORE_notify_transmit_ready (core_handle,
733                                              GNUNET_NO,
734                                              0,
735                                              GNUNET_TIME_UNIT_FOREVER_REL,
736                                              dst_id,
737                                              queue->size,
738                                              &queue_send,
739                                              peer);
740       return 0;
741   }
742   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   size %u ok\n", queue->size);
743
744   t = (NULL != c) ? c->t : NULL;
745   type = 0;
746
747   /* Fill buf */
748   switch (queue->type)
749   {
750     case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
751     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
752     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
753     case GNUNET_MESSAGE_TYPE_MESH_FWD:
754     case GNUNET_MESSAGE_TYPE_MESH_BCK:
755     case GNUNET_MESSAGE_TYPE_MESH_ACK:
756     case GNUNET_MESSAGE_TYPE_MESH_POLL:
757       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758                   "*   raw: %s\n",
759                   GNUNET_MESH_DEBUG_M2S (queue->type));
760       data_size = send_core_data_raw (queue->cls, size, buf);
761       msg = (struct GNUNET_MessageHeader *) buf;
762       type = ntohs (msg->type);
763       break;
764     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
765       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   path create\n");
766       if (GMC_is_origin (c, GNUNET_YES))
767         data_size = send_core_connection_create (queue->c, size, buf);
768       else
769         data_size = send_core_data_raw (queue->cls, size, buf);
770       break;
771     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
772       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   path ack\n");
773       if (GMC_is_origin (c, GNUNET_NO) ||
774           GMC_is_origin (c, GNUNET_YES))
775         data_size = send_core_connection_ack (queue->c, size, buf);
776       else
777         data_size = send_core_data_raw (queue->cls, size, buf);
778       break;
779     case GNUNET_MESSAGE_TYPE_MESH_DATA:
780     case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
781     case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY:
782       /* This should be encapsulted */
783       GNUNET_break (0);
784       data_size = 0;
785       break;
786     default:
787       GNUNET_break (0);
788       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "*   type unknown: %u\n",
789                   queue->type);
790       data_size = 0;
791   }
792
793   if (0 < drop_percent &&
794       GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent)
795   {
796     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
797                 "Dropping message of type %s\n",
798                 GNUNET_MESH_DEBUG_M2S (queue->type));
799     data_size = 0;
800   }
801
802   /* Free queue, but cls was freed by send_core_* */
803   ch = queue->ch;
804   queue_destroy (queue, GNUNET_NO);
805
806   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
807   switch (type)
808   {
809     case GNUNET_MESSAGE_TYPE_MESH_FWD:
810     case GNUNET_MESSAGE_TYPE_MESH_BCK:
811       pid = ntohl ( ((struct GNUNET_MESH_Encrypted *) buf)->pid );
812       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   accounting pid %u\n", pid);
813       fc->last_pid_sent = pid;
814       send_ack (c, ch, fwd);
815       break;
816     default:
817       break;
818   }
819
820   /* If more data in queue, send next */
821   queue = peer_get_first_message (peer);
822   if (NULL != queue)
823   {
824     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   more data!\n");
825     if (NULL == peer->core_transmit) {
826       peer->core_transmit =
827           GNUNET_CORE_notify_transmit_ready(core_handle,
828                                             0,
829                                             0,
830                                             GNUNET_TIME_UNIT_FOREVER_REL,
831                                             dst_id,
832                                             queue->size,
833                                             &queue_send,
834                                             peer);
835     }
836     else
837     {
838       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839                   "*   tmt rdy called somewhere else\n");
840     }
841     if (GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
842     {
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   starting poll timeout\n");
844       fc->poll_task =
845           GNUNET_SCHEDULER_add_delayed (fc->poll_time, &connection_poll, fc);
846     }
847   }
848   else
849   {
850     if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
851     {
852       GNUNET_SCHEDULER_cancel (fc->poll_task);
853       fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
854     }
855   }
856   if (NULL != c)
857   {
858     c->pending_messages--;
859     if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
860     {
861       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*  destroying connection!\n");
862       GMC_destroy (c);
863     }
864   }
865
866   if (NULL != t)
867   {
868     t->pending_messages--;
869     if (GNUNET_YES == t->destroy && 0 == t->pending_messages)
870     {
871 //       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*  destroying tunnel!\n");
872       tunnel_destroy (t);
873     }
874   }
875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*  Return %d\n", data_size);
876   return data_size;
877 }
878
879
880
881 /**
882  * Get first sendable message.
883  *
884  * @param peer The destination peer.
885  *
886  * @return Best current known path towards the peer, if any.
887  */
888 static struct MeshPeerQueue *
889 peer_get_first_message (const struct MeshPeer *peer)
890 {
891   struct MeshPeerQueue *q;
892
893   for (q = peer->queue_head; NULL != q; q = q->next)
894   {
895     if (queue_is_sendable (q))
896       return q;
897   }
898
899   return NULL;
900 }
901
902
903 static int
904 queue_is_sendable (struct MeshPeerQueue *q)
905 {
906   struct MeshFlowControl *fc;
907
908   /* Is PID-independent? */
909   switch (q->type)
910   {
911     case GNUNET_MESSAGE_TYPE_MESH_ACK:
912     case GNUNET_MESSAGE_TYPE_MESH_POLL:
913       return GNUNET_YES;
914   }
915
916   /* Is PID allowed? */
917   fc = q->fwd ? &q->c->fwd_fc : &q->c->bck_fc;
918   if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
919     return GNUNET_YES;
920
921   return GNUNET_NO;
922 }
923
924
925 /******************************************************************************/
926 /********************************    API    ***********************************/
927 /******************************************************************************/
928
929
930 /**
931  * Free a transmission that was already queued with all resources
932  * associated to the request.
933  *
934  * @param queue Queue handler to cancel.
935  * @param clear_cls Is it necessary to free associated cls?
936  */
937 void
938 GMP_queue_destroy (struct MeshPeerQueue *queue, int clear_cls)
939 {
940   struct MeshPeer *peer;
941   struct MeshFlowControl *fc;
942   int fwd;
943
944   fwd = queue->fwd;
945   peer = queue->peer;
946   GNUNET_assert (NULL != queue->c);
947   fc = fwd ? &queue->c->fwd_fc : &queue->c->bck_fc;
948
949   if (GNUNET_YES == clear_cls)
950   {
951     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "   queue destroy type %s\n",
952                 GNUNET_MESH_DEBUG_M2S (queue->type));
953     switch (queue->type)
954     {
955       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
956       case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
957         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n");
958         GNUNET_break (GNUNET_YES == queue->c->destroy);
959         /* fall through */
960       case GNUNET_MESSAGE_TYPE_MESH_FWD:
961       case GNUNET_MESSAGE_TYPE_MESH_BCK:
962       case GNUNET_MESSAGE_TYPE_MESH_ACK:
963       case GNUNET_MESSAGE_TYPE_MESH_POLL:
964       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
965       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
966       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
967         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "   prebuilt message\n");;
968         GNUNET_free_non_null (queue->cls);
969         break;
970
971       default:
972         GNUNET_break (0);
973         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "   type %s unknown!\n",
974                     GNUNET_MESH_DEBUG_M2S (queue->type));
975     }
976
977   }
978   GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
979
980   if (queue->type != GNUNET_MESSAGE_TYPE_MESH_ACK &&
981       queue->type != GNUNET_MESSAGE_TYPE_MESH_POLL)
982   {
983     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  Q_N- %p %u\n", fc, fc->queue_n);
984     fc->queue_n--;
985     peer->queue_n--;
986   }
987   if (NULL != queue->c)
988   {
989     queue->c->pending_messages--;
990     if (NULL != queue->c->t)
991     {
992       queue->c->t->pending_messages--;
993     }
994   }
995
996   GNUNET_free (queue);
997 }
998
999
1000 /**
1001  * @brief Queue and pass message to core when possible.
1002  *
1003  * @param cls Closure (@c type dependant). It will be used by queue_send to
1004  *            build the message to be sent if not already prebuilt.
1005  * @param type Type of the message, 0 for a raw message.
1006  * @param size Size of the message.
1007  * @param c Connection this message belongs to (cannot be NULL).
1008  * @param ch Channel this message belongs to, if applicable (otherwise NULL).
1009  * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!)
1010  */
1011 void
1012 GMP_queue_add (void *cls, uint16_t type, size_t size, 
1013                struct MeshConnection *c,
1014                struct MeshChannel *ch,
1015                int fwd)
1016 {
1017   struct MeshPeerQueue *queue;
1018   struct MeshFlowControl *fc;
1019   struct MeshPeer *peer;
1020   int priority;
1021   int call_core;
1022
1023   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1024               "queue add %s %s (%u) on c %p, ch %p\n",
1025               fwd ? "FWD" : "BCK",  GNUNET_MESH_DEBUG_M2S (type), size, c, ch);
1026   GNUNET_assert (NULL != c);
1027
1028   fc   = fwd ? &c->fwd_fc : &c->bck_fc;
1029   peer = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c);
1030
1031   if (NULL == fc)
1032   {
1033     GNUNET_break (0);
1034     return;
1035   }
1036
1037   if (NULL == peer->connections)
1038   {
1039     /* We are not connected to this peer, ignore request. */
1040     GNUNET_break_op (0);
1041     return;
1042   }
1043
1044   priority = 0;
1045
1046   if (GNUNET_MESSAGE_TYPE_MESH_POLL == type ||
1047       GNUNET_MESSAGE_TYPE_MESH_ACK == type)
1048   {
1049     priority = 100;
1050   }
1051
1052   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
1053   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "fc %p\n", fc);
1054   if (fc->queue_n >= fc->queue_max && 0 == priority)
1055   {
1056     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
1057                               1, GNUNET_NO);
1058     GNUNET_break (0);
1059     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060                 "queue full: %u/%u\n",
1061                 fc->queue_n, fc->queue_max);
1062     return; /* Drop this message */
1063   }
1064
1065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent);
1066   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "     ack %u\n", fc->last_ack_recv);
1067   if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
1068   {
1069     call_core = GNUNET_NO;
1070     if (GNUNET_SCHEDULER_NO_TASK == fc->poll_task &&
1071         GNUNET_MESSAGE_TYPE_MESH_POLL != type)
1072     {
1073       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074                   "no buffer space (%u > %u): starting poll\n",
1075                   fc->last_pid_sent + 1, fc->last_ack_recv);
1076       fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1077                                                     &connection_poll,
1078                                                     fc);
1079     }
1080   }
1081   else
1082     call_core = GNUNET_YES;
1083   queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
1084   queue->cls = cls;
1085   queue->type = type;
1086   queue->size = size;
1087   queue->peer = peer;
1088   queue->c = c;
1089   queue->ch = ch;
1090   queue->fwd = fwd;
1091   if (100 <= priority)
1092   {
1093     struct MeshPeerQueue *copy;
1094     struct MeshPeerQueue *next;
1095
1096     for (copy = peer->queue_head; NULL != copy; copy = next)
1097     {
1098       next = copy->next;
1099       if (copy->type == type && copy->c == c && copy->fwd == fwd)
1100       {
1101         /* Example: also a FWD ACK for connection XYZ */
1102         queue_destroy (copy, GNUNET_YES);
1103       }
1104     }
1105     GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, queue);
1106   }
1107   else
1108   {
1109     GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, queue);
1110     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
1111     fc->queue_n++;
1112     peer->queue_n++;
1113   }
1114
1115   if (NULL == peer->core_transmit && GNUNET_YES == call_core)
1116   {
1117     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118                 "calling core tmt rdy towards %s for %u bytes\n",
1119                 peer2s (peer), size);
1120     peer->core_transmit =
1121         GNUNET_CORE_notify_transmit_ready (core_handle,
1122                                            0,
1123                                            0,
1124                                            GNUNET_TIME_UNIT_FOREVER_REL,
1125                                            GNUNET_PEER_resolve2 (peer->id),
1126                                            size,
1127                                            &queue_send,
1128                                            peer);
1129   }
1130   else
1131   {
1132     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133                 "core tmt rdy towards %s already called\n",
1134                 peer2s (peer));
1135
1136   }
1137   c->pending_messages++;
1138   if (NULL != c->t)
1139     c->t->pending_messages++;
1140 }
1141
1142
1143
1144 /**
1145  * Initialize the peer subsystem.
1146  *
1147  * @param c Configuration.
1148  */
1149 void
1150 GMP_init (const struct GNUNET_CONFIGURATION_Handle *c)
1151 {
1152   peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
1153   if (GNUNET_OK !=
1154       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_PEERS",
1155                                              &max_peers))
1156   {
1157     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
1158                                "MESH", "MAX_PEERS", "USING DEFAULT");
1159     max_peers = 1000;
1160   }
1161
1162   if (GNUNET_OK !=
1163       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "DROP_PERCENT",
1164                                              &drop_percent))
1165   {
1166     drop_percent = 0;
1167   }
1168   else
1169   {
1170     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171                 "\n***************************************\n"
1172                 "Mesh is running with drop mode enabled.\n"
1173                 "This is NOT a good idea!\n"
1174                 "Remove the DROP_PERCENT option from your configuration.\n"
1175                 "***************************************\n");
1176   }
1177 }
1178
1179 /**
1180  * Shut down the peer subsystem.
1181  */
1182 void
1183 GMP_shutdown (void)
1184 {
1185   GNUNET_CONTAINER_multipeermap_iterate (peers, &shutdown_tunnel, NULL);
1186 }
1187
1188
1189 /**
1190  * Try to establish a new connection to this peer in the given tunnel.
1191  * If the peer doesn't have any path to it yet, try to get one.
1192  * If the peer already has some path, send a CREATE CONNECTION towards it.
1193  *
1194  * @param peer PeerInfo of the peer.
1195  */
1196 void
1197 GMP_connect (struct MeshPeer *peer)
1198 {
1199   struct MeshTunnel2 *t;
1200   struct MeshPeerPath *p;
1201   struct MeshConnection *c;
1202   int rerun_search;
1203
1204   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205               "peer_connect towards %s\n",
1206               peer2s (peer));
1207   t = peer->tunnel;
1208   c = NULL;
1209   rerun_search = GNUNET_NO;
1210
1211   if (NULL != peer->path_head)
1212   {
1213     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "path exists\n");
1214     p = peer_get_best_path (peer);
1215     if (NULL != p)
1216     {
1217       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  %u hops\n", p->length);
1218       c = tunnel_use_path (t, p);
1219       if (NULL == c)
1220       {
1221         /* This case can happen when the path includes a first hop that is
1222          * not yet known to be connected.
1223          *
1224          * This happens quite often during testing when running mesh
1225          * under valgrind: core connect notifications come very late and the
1226          * DHT result has already come and created a valid path.
1227          * In this case, the peer->connections hashmap will be NULL and
1228          * tunnel_use_path will not be able to create a connection from that
1229          * path.
1230          *
1231          * Re-running the DHT GET should give core time to callback.
1232          */
1233         GNUNET_break(0);
1234                 rerun_search = GNUNET_YES;
1235       }
1236       else
1237       {
1238         send_connection_create (c);
1239         return;
1240       }
1241     }
1242   }
1243
1244   if (NULL != peer->search_h && GNUNET_YES == rerun_search)
1245   {
1246     GMD_search_stop (peer->search_h);
1247     peer->search_h = NULL;
1248     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1249                 "  Stopping DHT GET for peer %s\n", peer2s (peer));
1250   }
1251
1252   if (NULL == peer->search_h)
1253   {
1254     const struct GNUNET_PeerIdentity *id;
1255
1256     id = GNUNET_PEER_resolve2 (peer->id);
1257     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                 "  Starting DHT GET for peer %s\n", peer2s (peer));
1259     peer->search_h = GMD_search (id, &search_handler, peer);
1260     if (MESH_TUNNEL_NEW == t->state)
1261       tunnel_change_state (t, MESH_TUNNEL_SEARCHING);
1262   }
1263 }
1264
1265 /**
1266  * Get the static string for a peer ID.
1267  *
1268  * @param peer Peer.
1269  *
1270  * @return Static string for it's ID.
1271  */
1272 const char *
1273 GMP_2s (const struct MeshPeer *peer)
1274 {
1275   if (NULL == peer)
1276     return "(NULL)";
1277   return GNUNET_i2s (GNUNET_PEER_resolve2 (peer->id));
1278 }