SET building is working now ... but SET itself seems to fail
[oweals/gnunet.git] / src / dv / gnunet-service-dv.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dv/gnunet-service-dv.c
23  * @brief the distance vector service, primarily handles gossip of nearby
24  * peers and sending/receiving DV messages from core and decapsulating
25  * them
26  *
27  * @author Christian Grothoff
28  * @author Nathan Evans
29  */
30 #include "platform.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_peerinfo_service.h"
36 #include "gnunet_statistics_service.h"
37 #include "gnunet_set_service.h"
38 #include "gnunet_ats_service.h"
39 #include "dv.h"
40 #include <gcrypt.h>
41
42
43 /**
44  * How often do we establish the consensu?
45  */
46 #define GNUNET_DV_CONSENSUS_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * Maximum number of messages we queue per peer.
50  */
51 #define MAX_QUEUE_SIZE 16
52
53 /**
54  * Maximum number of messages we queue towards the clients/plugin.
55  */
56 #define MAX_QUEUE_SIZE_PLUGIN 1024
57
58 /**
59  * The default fisheye depth, from how many hops away will
60  * we keep peers?
61  */
62 #define DEFAULT_FISHEYE_DEPTH 3
63
64 /**
65  * How many hops is a direct neighbor away?
66  */
67 #define DIRECT_NEIGHBOR_COST 1
68
69
70 GNUNET_NETWORK_STRUCT_BEGIN
71
72 /**
73  * Information about a peer DV can route to.  These entries are what
74  * we use as the binary format to establish consensus to create our
75  * routing table and as the address format in the HELLOs.
76  */
77 struct Target
78 {
79
80   /**
81    * Identity of the peer we can reach.
82    */
83   struct GNUNET_PeerIdentity peer;
84
85   /**
86    * How many hops (1-3) is this peer away? in network byte order
87    */
88   uint32_t distance GNUNET_PACKED;
89
90 };
91
92
93 /**
94  * Message exchanged between DV services (via core), requesting a
95  * message to be routed.
96  */
97 struct RouteMessage
98 {
99   /**
100    * Type: GNUNET_MESSAGE_TYPE_DV_ROUTE
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Expected (remaining) distance.  Must be always smaller than
106    * DEFAULT_FISHEYE_DEPTH, should be zero at the target.  Must
107    * be decremented by one at each hop.  Peers must not forward
108    * these messages further once the counter has reached zero.
109    */
110   uint32_t distance GNUNET_PACKED;
111
112   /**
113    * The (actual) target of the message (this peer, if distance is zero).
114    */
115   struct GNUNET_PeerIdentity target;
116
117   /**
118    * The (actual) sender of the message.
119    */
120   struct GNUNET_PeerIdentity sender;
121
122 };
123
124 GNUNET_NETWORK_STRUCT_END
125
126
127 /**
128  * Linked list of messages to send to clients.
129  */
130 struct PendingMessage
131 {
132   /**
133    * Pointer to next item in the list
134    */
135   struct PendingMessage *next;
136
137   /**
138    * Pointer to previous item in the list
139    */
140   struct PendingMessage *prev;
141
142   /**
143    * Actual message to be sent, allocated after this struct.
144    */
145   const struct GNUNET_MessageHeader *msg;
146
147   /**
148    * Ultimate target for the message.
149    */
150   struct GNUNET_PeerIdentity ultimate_target;
151
152   /**
153    * Unique ID of the message.
154    */
155   uint32_t uid;
156
157 };
158
159
160 /**
161  * Information about a direct neighbor (core-level, excluding
162  * DV-links, only DV-enabled peers).
163  */
164 struct DirectNeighbor
165 {
166
167   /**
168    * Identity of the peer.
169    */
170   struct GNUNET_PeerIdentity peer;
171
172   /**
173    * Session ID we use whenever we create a set union with
174    * this neighbor; constructed from the XOR of our peer
175    * IDs and then salted with "DV-SALT" to avoid conflicts
176    * with other applications.
177    */
178   struct GNUNET_HashCode real_session_id;
179
180   /**
181    * Head of linked list of messages to send to this peer.
182    */
183   struct PendingMessage *pm_head;
184
185   /**
186    * Tail of linked list of messages to send to this peer.
187    */
188   struct PendingMessage *pm_tail;
189
190   /**
191    * Transmit handle to core service.
192    */
193   struct GNUNET_CORE_TransmitHandle *cth;
194
195   /**
196    * Routing table of the neighbor, NULL if not yet established.
197    * Keys are peer identities, values are 'struct Target' entries.
198    * Note that the distances in the targets are from the point-of-view
199    * of the peer, not from us!
200    */
201   struct GNUNET_CONTAINER_MultiPeerMap *neighbor_table;
202
203   /**
204    * Updated routing table of the neighbor, under construction,
205    * NULL if we are not currently building it.
206    * Keys are peer identities, values are 'struct Target' entries.
207    * Note that the distances in the targets are from the point-of-view
208    * of the peer, not from us!
209    */
210   struct GNUNET_CONTAINER_MultiPeerMap *neighbor_table_consensus;
211
212   /**
213    * Our current (exposed) routing table as a set.
214    */
215   struct GNUNET_SET_Handle *my_set;
216
217   /**
218    * Handle for our current active set union operation.
219    */
220   struct GNUNET_SET_OperationHandle *set_op;
221
222   /**
223    * Handle used if we are listening for this peer, waiting for the
224    * other peer to initiate construction of the set union.  NULL if
225    * we ar the initiating peer.
226    */
227   struct GNUNET_SET_ListenHandle *listen_handle;
228
229   /**
230    * ID of the task we use to (periodically) update our consensus
231    * with this peer.  Used if we are the initiating peer.
232    */
233   GNUNET_SCHEDULER_TaskIdentifier initiate_task;
234
235   /**
236    * At what offset are we, with respect to inserting our own routes
237    * into the consensus?
238    */
239   unsigned int consensus_insertion_offset;
240
241   /**
242    * At what distance are we, with respect to inserting our own routes
243    * into the consensus?
244    */
245   unsigned int consensus_insertion_distance;
246
247   /**
248    * Number of messages currently in the 'pm_XXXX'-DLL.
249    */
250   unsigned int pm_queue_size;
251
252   unsigned int consensus_elements;
253
254   /**
255    * Flag set within 'check_target_removed' to trigger full global route refresh.
256    */
257   int target_removed;
258
259   /**
260    * Our distance to this peer, 0 for unknown.
261    */
262   uint32_t distance;
263
264   /**
265    * Is this neighbor connected at the core level?
266    */
267   int connected;
268
269 };
270
271
272 /**
273  * A route includes information about the next hop,
274  * the target, and the ultimate distance to the
275  * target.
276  */
277 struct Route
278 {
279
280   /**
281    * Which peer do we need to forward the message to?
282    */
283   struct DirectNeighbor *next_hop;
284
285   /**
286    * What would be the target, and how far is it away?
287    */
288   struct Target target;
289
290   /**
291    * Offset of this target in the respective consensus set.
292    */
293   unsigned int set_offset;
294
295 };
296
297
298 /**
299  * Set of targets we bring to a consensus; all targets in a set have a
300  * distance equal to the sets distance (which is implied by the array
301  * index of the set).
302  */
303 struct ConsensusSet
304 {
305
306   /**
307    * Array of targets in the set, may include NULL entries if a
308    * neighbor has disconnected; the targets are allocated with the
309    * respective container (all_routes), not here.
310    */
311   struct Route **targets;
312
313   /**
314    * Size of the 'targets' array.
315    */
316   unsigned int array_length;
317
318 };
319
320
321 /**
322  * Peermap of all of our neighbors; processing these usually requires
323  * first checking to see if the peer is core-connected and if the
324  * distance is 1, in which case they are direct neighbors.
325  */
326 static struct GNUNET_CONTAINER_MultiPeerMap *direct_neighbors;
327
328 /**
329  * Hashmap with all routes that we currently support; contains
330  * routing information for all peers from distance 2
331  * up to distance DEFAULT_FISHEYE_DEPTH.
332  */
333 static struct GNUNET_CONTAINER_MultiPeerMap *all_routes;
334
335 /**
336  * Array of consensus sets we expose to the outside world.  Sets
337  * are structured by the distance to the target.
338  */
339 static struct ConsensusSet consensi[DEFAULT_FISHEYE_DEPTH - 1];
340
341 /**
342  * Handle to the core service api.
343  */
344 static struct GNUNET_CORE_Handle *core_api;
345
346 /**
347  * The identity of our peer.
348  */
349 static struct GNUNET_PeerIdentity my_identity;
350
351 /**
352  * The configuration for this service.
353  */
354 static const struct GNUNET_CONFIGURATION_Handle *cfg;
355
356 /**
357  * The client, the DV plugin connected to us (or an event monitor).
358  * Hopefully this client will never change, although if the plugin
359  * dies and returns for some reason it may happen.
360  */
361 static struct GNUNET_SERVER_NotificationContext *nc;
362
363 /**
364  * Handle for the statistics service.
365  */
366 static struct GNUNET_STATISTICS_Handle *stats;
367
368 /**
369  * Handle to ATS service.
370  */
371 static struct GNUNET_ATS_PerformanceHandle *ats;
372
373
374 /**
375  * Start creating a new DV set union by initiating the connection.
376  *
377  * @param cls the 'struct DirectNeighbor' of the peer we're building
378  *        a routing consensus with
379  * @param tc scheduler context
380  */
381 static void
382 initiate_set_union (void *cls,
383                     const struct GNUNET_SCHEDULER_TaskContext *tc);
384
385
386 /**
387  * Start creating a new DV set union construction, our neighbour has
388  * asked for it (callback for listening peer).
389  *
390  * @param cls the 'struct DirectNeighbor' of the peer we're building
391  *        a routing consensus with
392  * @param other_peer the other peer
393  * @param context_msg message with application specific information from
394  *        the other peer
395  * @param request request from the other peer, use GNUNET_SET_accept
396  *        to accept it, otherwise the request will be refused
397  *        Note that we don't use a return value here, as it is also
398  *        necessary to specify the set we want to do the operation with,
399  *        whith sometimes can be derived from the context message.
400  *        Also necessary to specify the timeout.
401  */
402 static void
403 listen_set_union (void *cls,
404                   const struct GNUNET_PeerIdentity *other_peer,
405                   const struct GNUNET_MessageHeader *context_msg,
406                   struct GNUNET_SET_Request *request);
407
408
409 /**
410  * Forward a message from another peer to the plugin.
411  *
412  * @param message the message to send to the plugin
413  * @param origin the original sender of the message
414  * @param distance distance to the original sender of the message
415  */
416 static void
417 send_data_to_plugin (const struct GNUNET_MessageHeader *message,
418                      const struct GNUNET_PeerIdentity *origin,
419                      uint32_t distance)
420 {
421   struct GNUNET_DV_ReceivedMessage *received_msg;
422   size_t size;
423
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425               "Delivering message from peer `%s'\n",
426               GNUNET_i2s (origin));
427   size = sizeof (struct GNUNET_DV_ReceivedMessage) +
428     ntohs (message->size);
429   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
430   {
431     GNUNET_break (0); /* too big */
432     return;
433   }
434   received_msg = GNUNET_malloc (size);
435   received_msg->header.size = htons (size);
436   received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV);
437   received_msg->distance = htonl (distance);
438   received_msg->sender = *origin;
439   memcpy (&received_msg[1], message, ntohs (message->size));
440   GNUNET_SERVER_notification_context_broadcast (nc,
441                                                 &received_msg->header,
442                                                 GNUNET_YES);
443   GNUNET_free (received_msg);
444 }
445
446
447 /**
448  * Forward a control message to the plugin.
449  *
450  * @param message the message to send to the plugin
451  */
452 static void
453 send_control_to_plugin (const struct GNUNET_MessageHeader *message)
454 {
455   GNUNET_SERVER_notification_context_broadcast (nc,
456                                                 message,
457                                                 GNUNET_NO);
458 }
459
460
461 /**
462  * Give an (N)ACK message to the plugin, we transmitted a message for it.
463  *
464  * @param target peer that received the message
465  * @param uid plugin-chosen UID for the message
466  * @param nack GNUNET_NO to send ACK, GNUNET_YES to send NACK
467  */
468 static void
469 send_ack_to_plugin (const struct GNUNET_PeerIdentity *target,
470                     uint32_t uid,
471                     int nack)
472 {
473   struct GNUNET_DV_AckMessage ack_msg;
474
475   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476               "Delivering ACK for message to peer `%s'\n",
477               GNUNET_i2s (target));
478   ack_msg.header.size = htons (sizeof (ack_msg));
479   ack_msg.header.type = htons ((GNUNET_YES == nack)
480                                ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK
481                                : GNUNET_MESSAGE_TYPE_DV_SEND_ACK);
482   ack_msg.uid = htonl (uid);
483   ack_msg.target = *target;
484   send_control_to_plugin (&ack_msg.header);
485 }
486
487
488 /**
489  * Send a DISTANCE_CHANGED message to the plugin.
490  *
491  * @param peer peer with a changed distance
492  * @param distance new distance to the peer
493  */
494 static void
495 send_distance_change_to_plugin (const struct GNUNET_PeerIdentity *peer,
496                                 uint32_t distance)
497 {
498   struct GNUNET_DV_DistanceUpdateMessage du_msg;
499
500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501               "Delivering DISTANCE_CHANGED for message about peer `%s'\n",
502               GNUNET_i2s (peer));
503   du_msg.header.size = htons (sizeof (du_msg));
504   du_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED);
505   du_msg.distance = htonl (distance);
506   du_msg.peer = *peer;
507   send_control_to_plugin (&du_msg.header);
508 }
509
510
511 /**
512  * Give a CONNECT message to the plugin.
513  *
514  * @param target peer that connected
515  * @param distance distance to the target
516  */
517 static void
518 send_connect_to_plugin (const struct GNUNET_PeerIdentity *target,
519                         uint32_t distance)
520 {
521   struct GNUNET_DV_ConnectMessage cm;
522
523   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524               "Delivering CONNECT about peer `%s'\n",
525               GNUNET_i2s (target));
526   cm.header.size = htons (sizeof (cm));
527   cm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_CONNECT);
528   cm.distance = htonl (distance);
529   cm.peer = *target;
530   send_control_to_plugin (&cm.header);
531 }
532
533
534 /**
535  * Give a DISCONNECT message to the plugin.
536  *
537  * @param target peer that disconnected
538  */
539 static void
540 send_disconnect_to_plugin (const struct GNUNET_PeerIdentity *target)
541 {
542   struct GNUNET_DV_DisconnectMessage dm;
543
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545               "Delivering DISCONNECT about peer `%s'\n",
546               GNUNET_i2s (target));
547   dm.header.size = htons (sizeof (dm));
548   dm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISCONNECT);
549   dm.reserved = htonl (0);
550   dm.peer = *target;
551   send_control_to_plugin (&dm.header);
552 }
553
554
555 /**
556  * Function called to transfer a message to another peer
557  * via core.
558  *
559  * @param cls closure with the direct neighbor
560  * @param size number of bytes available in buf
561  * @param buf where the callee should write the message
562  * @return number of bytes written to buf
563  */
564 static size_t
565 core_transmit_notify (void *cls, size_t size, void *buf)
566 {
567   struct DirectNeighbor *dn = cls;
568   char *cbuf = buf;
569   struct PendingMessage *pending;
570   size_t off;
571   size_t msize;
572
573   dn->cth = NULL;
574   if (NULL == buf)
575   {
576     /* peer disconnected */
577     return 0;
578   }
579   off = 0;
580   pending = dn->pm_head;
581   off = 0;
582   while ( (NULL != (pending = dn->pm_head)) &&
583           (size >= off + (msize = ntohs (pending->msg->size))))
584   {
585     dn->pm_queue_size--;
586     GNUNET_CONTAINER_DLL_remove (dn->pm_head,
587                                  dn->pm_tail,
588                                  pending);
589     memcpy (&cbuf[off], pending->msg, msize);
590     if (0 != pending->uid)
591       send_ack_to_plugin (&pending->ultimate_target,
592                           pending->uid,
593                           GNUNET_NO);
594     GNUNET_free (pending);
595     off += msize;
596   }
597   if (NULL != dn->pm_head)
598     dn->cth =
599       GNUNET_CORE_notify_transmit_ready (core_api,
600                                          GNUNET_YES /* cork */,
601                                          0 /* priority */,
602                                          GNUNET_TIME_UNIT_FOREVER_REL,
603                                          &dn->peer,
604                                          msize,                                 
605                                          &core_transmit_notify, dn);
606   return off;
607 }
608
609
610 /**
611  * Forward the given payload to the given target.
612  *
613  * @param target where to send the message
614  * @param uid unique ID for the message
615  * @param ultimate_target ultimate recipient for the message
616  * @param distance expected (remaining) distance to the target
617  * @param sender original sender of the message
618  * @param payload payload of the message
619  */
620 static void
621 forward_payload (struct DirectNeighbor *target,
622                  uint32_t distance,
623                  uint32_t uid,
624                  const struct GNUNET_PeerIdentity *sender,
625                  const struct GNUNET_PeerIdentity *ultimate_target,
626                  const struct GNUNET_MessageHeader *payload)
627 {
628   struct PendingMessage *pm;
629   struct RouteMessage *rm;
630   size_t msize;
631
632   if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
633        (0 != memcmp (sender,
634                      &my_identity,
635                      sizeof (struct GNUNET_PeerIdentity))) )
636   {
637     GNUNET_break (0 == uid);
638     return;
639   }
640   msize = sizeof (struct RouteMessage) + ntohs (payload->size);
641   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
642   {
643     GNUNET_break (0);
644     return;
645   }
646   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
647   pm->ultimate_target = *ultimate_target;
648   pm->uid = uid;
649   pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
650   rm = (struct RouteMessage *) &pm[1];
651   rm->header.size = htons ((uint16_t) msize);
652   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE);
653   rm->distance = htonl (distance);
654   rm->target = target->peer;
655   rm->sender = *sender;
656   memcpy (&rm[1], payload, ntohs (payload->size));
657   GNUNET_CONTAINER_DLL_insert_tail (target->pm_head,
658                                     target->pm_tail,
659                                     pm);
660   target->pm_queue_size++;
661   if (NULL == target->cth)
662     target->cth = GNUNET_CORE_notify_transmit_ready (core_api,
663                                                      GNUNET_YES /* cork */,
664                                                      0 /* priority */,
665                                                      GNUNET_TIME_UNIT_FOREVER_REL,
666                                                      &target->peer,
667                                                      msize,                                     
668                                                      &core_transmit_notify, target);
669 }
670
671
672 /**
673  * Find a free slot for storing a 'route' in the 'consensi'
674  * set at the given distance.
675  *
676  * @param distance distance to use for the set slot
677  */
678 static unsigned int
679 get_consensus_slot (uint32_t distance)
680 {
681   struct ConsensusSet *cs;
682   unsigned int i;
683
684   cs = &consensi[distance];
685   i = 0;
686   while ( (i < cs->array_length) &&
687           (NULL != cs->targets[i]) ) i++;
688   if (i == cs->array_length)
689   {
690     GNUNET_array_grow (cs->targets,
691                        cs->array_length,
692                        cs->array_length * 2 + 2);
693   }
694   return i;
695 }
696
697
698 /**
699  * Allocate a slot in the consensus set for a route.
700  *
701  * @param route route to initialize
702  * @param distance which consensus set to use
703  */
704 static void
705 allocate_route (struct Route *route,
706                 uint32_t distance)
707 {
708   unsigned int i;
709
710   i = get_consensus_slot (distance);
711   route->set_offset = i;
712   consensi[distance].targets[i] = route;
713   route->target.distance = htonl (distance);
714 }
715
716
717 /**
718  * Release a slot in the consensus set for a route.
719  *
720  * @param route route to release the slot from
721  */
722 static void
723 release_route (struct Route *route)
724 {
725   consensi[ntohl (route->target.distance)].targets[route->set_offset] = NULL;
726   route->set_offset = UINT_MAX; /* indicate invalid slot */
727 }
728
729
730 /**
731  * Move a route from one consensus set to another.
732  *
733  * @param route route to move
734  * @param new_distance new distance for the route (destination set)
735  */
736 static void
737 move_route (struct Route *route,
738             uint32_t new_distance)
739 {
740   unsigned int i;
741
742   release_route (route);
743   i = get_consensus_slot (new_distance);
744   route->set_offset = i;
745   consensi[new_distance].targets[i] = route;
746   route->target.distance = htonl (new_distance);
747 }
748
749
750 /**
751  * Initialize this neighbors 'my_set' and when done give
752  * it to the pending set operation for execution.
753  *
754  * Add a single element to the set per call:
755  *
756  * If we reached the last element of a consensus element: increase distance
757  *
758  *
759  * @param cls the neighbor for which we are building the set
760  */
761 static void
762 build_set (void *cls)
763 {
764   struct DirectNeighbor *neighbor = cls;
765   struct GNUNET_SET_Element element;
766   struct Target *target;
767
768   target = NULL;
769   while ( (DEFAULT_FISHEYE_DEPTH - 1 > neighbor->consensus_insertion_distance) &&
770           (consensi[neighbor->consensus_insertion_distance].array_length == neighbor->consensus_insertion_offset) )
771   {
772     /* If we reached the last element of a consensus array element: increase distance and start with next array */
773     neighbor->consensus_insertion_offset = 0;
774     neighbor->consensus_insertion_distance++;
775
776     /* skip over NULL entries */
777     while ( (DEFAULT_FISHEYE_DEPTH - 1 > neighbor->consensus_insertion_distance) &&
778             (consensi[neighbor->consensus_insertion_distance].array_length  > neighbor->consensus_insertion_offset) &&
779             (NULL == consensi[neighbor->consensus_insertion_distance].targets[neighbor->consensus_insertion_offset]) )
780       neighbor->consensus_insertion_offset++;
781   }
782   if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance)
783   {
784     /* we have added all elements to the set, run the operation */
785     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786                 "Finished building my SET for peer `%s' with %u elements, committing\n",
787                 GNUNET_i2s(&neighbor->peer),
788                 neighbor->consensus_elements);
789     GNUNET_SET_commit (neighbor->set_op,
790                        neighbor->my_set);
791     GNUNET_SET_destroy (neighbor->my_set);
792     neighbor->my_set = NULL;
793     return;
794   }
795
796   target = &consensi[neighbor->consensus_insertion_distance].targets[neighbor->consensus_insertion_offset]->target;
797   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
798               "Adding peer `%s' with distance %u to SET\n",
799               GNUNET_i2s (&target->peer),
800               ntohl (target->distance));
801   element.size = sizeof (struct Target);
802   element.type = htons (0); /* do we need this? */
803   element.data = target;
804   neighbor->consensus_elements++;
805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806               "Adding element to SET\n");
807   GNUNET_SET_add_element (neighbor->my_set,
808                           &element,
809                           &build_set, neighbor);
810
811   /* Find next non-NULL entry */
812   neighbor->consensus_insertion_offset++;
813   /* skip over NULL entries */
814   while ( (DEFAULT_FISHEYE_DEPTH - 1 > neighbor->consensus_insertion_distance) &&
815           (consensi[neighbor->consensus_insertion_distance].array_length > neighbor->consensus_insertion_offset) &&
816           (NULL == consensi[neighbor->consensus_insertion_distance].targets[neighbor->consensus_insertion_offset]) )
817     neighbor->consensus_insertion_offset++;
818 }
819
820
821 /**
822  * A peer is now connected to us at distance 1.  Initiate DV exchange.
823  *
824  * @param neighbor entry for the neighbor at distance 1
825  */
826 static void
827 handle_direct_connect (struct DirectNeighbor *neighbor)
828 {
829   struct Route *route;
830   struct GNUNET_HashCode h1;
831   struct GNUNET_HashCode h2;
832   struct GNUNET_HashCode session_id;
833
834   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
835               "Direct connection to %s established, routing table exchange begins.\n",
836               GNUNET_i2s (&neighbor->peer));
837   GNUNET_STATISTICS_update (stats,
838                             "# peers connected (1-hop)",
839                             1, GNUNET_NO);
840   route = GNUNET_CONTAINER_multipeermap_get (all_routes,
841                                              &neighbor->peer);
842   if (NULL != route)
843   {
844     send_disconnect_to_plugin (&neighbor->peer);
845     release_route (route);
846     GNUNET_free (route);
847   }
848
849   route = GNUNET_new (struct Route);
850   route->next_hop = neighbor;
851   route->target.peer= neighbor->peer;
852   route->target.distance = DIRECT_NEIGHBOR_COST;
853   allocate_route (route, DIRECT_NEIGHBOR_COST);
854
855   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856               "Adding direct route to %s\n",
857               GNUNET_i2s (&route->target.peer));
858
859
860   /* construct session ID seed as XOR of both peer's identities */
861   GNUNET_CRYPTO_hash (&my_identity, sizeof (my_identity), &h1);
862   GNUNET_CRYPTO_hash (&neighbor->peer, sizeof (struct GNUNET_PeerIdentity), &h2);
863   GNUNET_CRYPTO_hash_xor (&h1,
864                           &h2,
865                           &session_id);
866   /* make sure session ID is unique across applications by salting it with 'DV' */
867   GNUNET_CRYPTO_hkdf (&neighbor->real_session_id, sizeof (struct GNUNET_HashCode),
868                       GCRY_MD_SHA512, GCRY_MD_SHA256,
869                       "DV-SALT", 2,
870                       &session_id, sizeof (session_id),
871                       NULL, 0);
872   if (0 < memcmp (&neighbor->peer,
873                   &my_identity,
874                   sizeof (struct GNUNET_PeerIdentity)))
875   {
876     neighbor->initiate_task = GNUNET_SCHEDULER_add_now (&initiate_set_union,
877                                                         neighbor);
878   }
879   else
880   {
881     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882                 "Starting SET listen operation\n");
883     neighbor->listen_handle = GNUNET_SET_listen (cfg,
884                                                  GNUNET_SET_OPERATION_UNION,
885                                                  &neighbor->real_session_id,
886                                                  &listen_set_union,
887                                                  neighbor);
888   }
889 }
890
891
892 /**
893  * Method called whenever a peer connects.
894  *
895  * @param cls closure
896  * @param peer peer identity this notification is about
897  */
898 static void
899 handle_core_connect (void *cls,
900                      const struct GNUNET_PeerIdentity *peer)
901 {
902   struct DirectNeighbor *neighbor;
903
904   /* Check for connect to self message */
905   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
906     return;
907   /* check if entry exists */
908   neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors,
909                                                 peer);
910   if (NULL != neighbor)
911   {
912     GNUNET_break (GNUNET_YES != neighbor->connected);
913     neighbor->connected = GNUNET_YES;
914     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
915                 "Core connected to %s (distance %u)\n",
916                 GNUNET_i2s (peer),
917                 (unsigned int) neighbor->distance);
918     if (DIRECT_NEIGHBOR_COST != neighbor->distance)
919       return;
920     handle_direct_connect (neighbor);
921     return;
922   }
923   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
924               "Core connected to %s (distance unknown)\n",
925               GNUNET_i2s (peer));
926   neighbor = GNUNET_new (struct DirectNeighbor);
927   neighbor->peer = *peer;
928   GNUNET_assert (GNUNET_YES ==
929                  GNUNET_CONTAINER_multipeermap_put (direct_neighbors,
930                                                     peer,
931                                                     neighbor,
932                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
933   neighbor->connected = GNUNET_YES;
934   neighbor->distance = 0; /* unknown */
935 }
936
937
938 /**
939  * Called for each 'target' in a neighbor table to free the associated memory.
940  *
941  * @param cls NULL
942  * @param key key of the value
943  * @param value value to free
944  * @return GNUNET_OK to continue to iterate
945  */
946 static int
947 free_targets (void *cls,
948               const struct GNUNET_PeerIdentity *key,
949               void *value)
950 {
951   GNUNET_free (value);
952   return GNUNET_OK;
953 }
954
955
956 /**
957  * Multipeerhmap iterator for checking if a given route is
958  * (now) useful to this peer.
959  *
960  * @param cls the direct neighbor for the given route
961  * @param key key value stored under
962  * @param value a 'struct Target' that may or may not be useful; not that
963  *        the distance in 'target' does not include the first hop yet
964  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
965  */
966 static int
967 check_possible_route (void *cls,
968                       const struct GNUNET_PeerIdentity *key,
969                       void *value)
970 {
971   struct DirectNeighbor *neighbor = cls;
972   struct Target *target = value;
973   struct Route *route;
974
975   route = GNUNET_CONTAINER_multipeermap_get (all_routes,
976                                              key);
977   if (NULL != route)
978   {
979     if (ntohl (route->target.distance) > ntohl (target->distance) + 1)
980     {
981       /* this 'target' is cheaper than the existing route; switch to alternative route! */
982       move_route (route, ntohl (target->distance) + 1);
983       route->next_hop = neighbor;
984       send_distance_change_to_plugin (&target->peer, ntohl (target->distance) + 1);
985     }
986     return GNUNET_YES; /* got a route to this target already */
987   }
988   route = GNUNET_new (struct Route);
989   route->next_hop = neighbor;
990   route->target.distance = htonl (ntohl (target->distance) + 1);
991   route->target.peer = target->peer;
992   allocate_route (route, ntohl (route->target.distance));
993   GNUNET_assert (GNUNET_YES ==
994                  GNUNET_CONTAINER_multipeermap_put (all_routes,
995                                                     &route->target.peer,
996                                                     route,
997                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
998   send_connect_to_plugin (&route->target.peer, ntohl (target->distance));
999   return GNUNET_YES;
1000 }
1001
1002
1003 /**
1004  * Multipeermap iterator for finding routes that were previously
1005  * "hidden" due to a better route (called after a disconnect event).
1006  *
1007  * @param cls NULL
1008  * @param key peer identity of the given direct neighbor
1009  * @param value a 'struct DirectNeighbor' to check for additional routes
1010  * @return GNUNET_YES to continue iteration
1011  */
1012 static int
1013 refresh_routes (void *cls,
1014                 const struct GNUNET_PeerIdentity *key,
1015                 void *value)
1016 {
1017   struct DirectNeighbor *neighbor = value;
1018
1019   if ( (GNUNET_YES != neighbor->connected) ||
1020        (DIRECT_NEIGHBOR_COST != neighbor->distance) )
1021     return GNUNET_YES;
1022   if (NULL != neighbor->neighbor_table)
1023     GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
1024                                            &check_possible_route,
1025                                            neighbor);
1026   return GNUNET_YES;
1027 }
1028
1029
1030 /**
1031  * Get distance information from 'atsi'.
1032  *
1033  * @param atsi performance data
1034  * @param atsi_count number of entries in atsi
1035  * @return connected transport distance
1036  */
1037 static uint32_t
1038 get_atsi_distance (const struct GNUNET_ATS_Information *atsi,
1039                    uint32_t atsi_count)
1040 {
1041   uint32_t i;
1042
1043   for (i = 0; i < atsi_count; i++)
1044     if (ntohl (atsi[i].type) == GNUNET_ATS_QUALITY_NET_DISTANCE)
1045       return (0 == ntohl (atsi->value)) ? DIRECT_NEIGHBOR_COST : ntohl (atsi->value); // FIXME: 0 check should not be required once ATS is fixed!
1046   /* If we do not have explicit distance data, assume direct neighbor. */
1047   return DIRECT_NEIGHBOR_COST;
1048 }
1049
1050
1051 /**
1052  * Multipeermap iterator for freeing routes that go via a particular
1053  * neighbor that disconnected and is thus no longer available.
1054  *
1055  * @param cls the direct neighbor that is now unavailable
1056  * @param key key value stored under
1057  * @param value a 'struct Route' that may or may not go via neighbor
1058  *
1059  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
1060  */
1061 static int
1062 cull_routes (void *cls,
1063              const struct GNUNET_PeerIdentity *key,
1064              void *value)
1065 {
1066   struct DirectNeighbor *neighbor = cls;
1067   struct Route *route = value;
1068
1069   if (route->next_hop != neighbor)
1070     return GNUNET_YES; /* not affected */
1071   GNUNET_assert (GNUNET_YES ==
1072                  GNUNET_CONTAINER_multipeermap_remove (all_routes, key, value));
1073   release_route (route);
1074   send_disconnect_to_plugin (&route->target.peer);
1075   GNUNET_free (route);
1076   return GNUNET_YES;
1077 }
1078
1079
1080 /**
1081  * Handle the case that a direct connection to a peer is
1082  * disrupted.  Remove all routes via that peer and
1083  * stop the consensus with it.
1084  *
1085  * @param neighbor peer that was disconnected (or at least is no
1086  *    longer at distance 1)
1087  */
1088 static void
1089 handle_direct_disconnect (struct DirectNeighbor *neighbor)
1090 {
1091   GNUNET_CONTAINER_multipeermap_iterate (all_routes,
1092                                          &cull_routes,
1093                                          neighbor);
1094   if (NULL != neighbor->cth)
1095   {
1096     GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth);
1097     neighbor->cth = NULL;
1098   }
1099   if (NULL != neighbor->neighbor_table_consensus)
1100   {
1101     GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table_consensus,
1102                                            &free_targets,
1103                                            NULL);
1104     GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table_consensus);
1105     neighbor->neighbor_table_consensus = NULL;
1106   }
1107   if (NULL != neighbor->neighbor_table)
1108   {
1109     GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
1110                                            &free_targets,
1111                                            NULL);
1112     GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table);
1113     neighbor->neighbor_table = NULL;
1114   }
1115   if (NULL != neighbor->set_op)
1116   {
1117     GNUNET_SET_operation_cancel (neighbor->set_op);
1118     neighbor->set_op = NULL;
1119   }
1120   if (NULL != neighbor->my_set)
1121   {
1122     GNUNET_SET_destroy (neighbor->my_set);
1123     neighbor->my_set = NULL;
1124   }
1125   if (NULL != neighbor->listen_handle)
1126   {
1127     GNUNET_SET_listen_cancel (neighbor->listen_handle);
1128     neighbor->listen_handle = NULL;
1129   }
1130   if (GNUNET_SCHEDULER_NO_TASK != neighbor->initiate_task)
1131   {
1132     GNUNET_SCHEDULER_cancel (neighbor->initiate_task);
1133     neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
1134   }
1135 }
1136
1137
1138 /**
1139  * Function that is called with QoS information about an address; used
1140  * to update our current distance to another peer.
1141  *
1142  * @param cls closure
1143  * @param address the address
1144  * @param active is this address in active use
1145  * @param bandwidth_out assigned outbound bandwidth for the connection
1146  * @param bandwidth_in assigned inbound bandwidth for the connection
1147  * @param ats performance data for the address (as far as known)
1148  * @param ats_count number of performance records in 'ats'
1149  */
1150 static void
1151 handle_ats_update (void *cls,
1152                    const struct GNUNET_HELLO_Address *address,
1153                    int active,
1154                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1155                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1156                    const struct GNUNET_ATS_Information *ats,
1157                    uint32_t ats_count)
1158 {
1159   struct DirectNeighbor *neighbor;
1160   uint32_t distance;
1161
1162   if (GNUNET_NO == active)
1163         return;
1164   distance = get_atsi_distance (ats, ats_count);
1165   /*
1166   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167               "ATS says distance to %s is %u\n",
1168               GNUNET_i2s (&address->peer),
1169               (unsigned int) distance);*/
1170   /* check if entry exists */
1171   neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors,
1172                                                 &address->peer);
1173   if (NULL != neighbor)
1174   {
1175     if ( (DIRECT_NEIGHBOR_COST == neighbor->distance) &&
1176          (DIRECT_NEIGHBOR_COST == distance) )
1177       return; /* no change */
1178     if (DIRECT_NEIGHBOR_COST == neighbor->distance)
1179     {
1180       neighbor->distance = distance;
1181       GNUNET_STATISTICS_update (stats,
1182                                 "# peers connected (1-hop)",
1183                                 -1, GNUNET_NO);
1184       handle_direct_disconnect (neighbor);
1185       GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
1186                                              &refresh_routes,
1187                                              NULL);
1188       return;
1189     }
1190     neighbor->distance = distance;
1191     if (DIRECT_NEIGHBOR_COST != neighbor->distance)
1192       return;
1193     if (GNUNET_YES != neighbor->connected)
1194       return;
1195     handle_direct_connect (neighbor);
1196     return;
1197   }
1198   neighbor = GNUNET_new (struct DirectNeighbor);
1199   neighbor->peer = address->peer;
1200   GNUNET_assert (GNUNET_YES ==
1201                  GNUNET_CONTAINER_multipeermap_put (direct_neighbors,
1202                                                     &address->peer,
1203                                                     neighbor,
1204                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1205   neighbor->connected = GNUNET_NO; /* not yet */
1206   neighbor->distance = distance;
1207 }
1208
1209
1210 /**
1211  * Check if a target was removed from the set of the other peer; if so,
1212  * if we also used it for our route, we need to remove it from our
1213  * 'all_routes' set (and later check if an alternative path now exists).
1214  *
1215  * @param cls the 'struct DirectNeighbor'
1216  * @param key peer identity for the target
1217  * @param value a 'struct Target' previously reachable via the given neighbor
1218  */
1219 static int
1220 check_target_removed (void *cls,
1221                       const struct GNUNET_PeerIdentity *key,
1222                       void *value)
1223 {
1224   struct DirectNeighbor *neighbor = cls;
1225   struct Target *new_target;
1226   struct Route *current_route;
1227
1228   new_target = GNUNET_CONTAINER_multipeermap_get (neighbor->neighbor_table_consensus,
1229                                                   key);
1230   if (NULL == new_target)
1231   {
1232     /* target was revoked, check if it was used */
1233     current_route = GNUNET_CONTAINER_multipeermap_get (all_routes,
1234                                                        key);
1235     if ( (NULL == current_route) ||
1236          (current_route->next_hop != neighbor) )
1237     {
1238       /* didn't matter, wasn't used */
1239       return GNUNET_OK;
1240     }
1241     /* remove existing route */
1242     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1243                 "Lost route to %s\n",
1244                 GNUNET_i2s (&current_route->target.peer));
1245     GNUNET_assert (GNUNET_YES ==
1246                    GNUNET_CONTAINER_multipeermap_remove (all_routes, key, current_route));
1247     send_disconnect_to_plugin (&current_route->target.peer);
1248     GNUNET_free (current_route);
1249     neighbor->target_removed = GNUNET_YES;
1250     return GNUNET_OK;
1251   }
1252   return GNUNET_OK;
1253 }
1254
1255
1256 /**
1257  * Check if a target was added to the set of the other peer; if it
1258  * was added or impoves the existing route, do the needed updates.
1259  *
1260  * @param cls the 'struct DirectNeighbor'
1261  * @param key peer identity for the target
1262  * @param value a 'struct Target' now reachable via the given neighbor
1263  */
1264 static int
1265 check_target_added (void *cls,
1266                     const struct GNUNET_PeerIdentity *key,
1267                     void *value)
1268 {
1269   struct DirectNeighbor *neighbor = cls;
1270   struct Target *target = value;
1271   struct Route *current_route;
1272
1273   /* target was revoked, check if it was used */
1274   current_route = GNUNET_CONTAINER_multipeermap_get (all_routes,
1275                                                      key);
1276   if (NULL != current_route)
1277   {
1278     /* route exists */
1279     if (current_route->next_hop == neighbor)
1280     {
1281       /* we had the same route before, no change */
1282       if (ntohl (target->distance) + 1 != ntohl (current_route->target.distance))
1283       {
1284         current_route->target.distance = htonl (ntohl (target->distance) + 1);
1285         send_distance_change_to_plugin (&target->peer, ntohl (target->distance) + 1);
1286       }
1287       return GNUNET_OK;
1288     }
1289     if (ntohl (current_route->target.distance) >= ntohl (target->distance) + 1)
1290     {
1291       /* alternative, shorter route exists, ignore */
1292       return GNUNET_OK;
1293     }
1294     /* new route is better than the existing one, take over! */
1295     /* NOTE: minor security issue: malicious peers may advertise
1296        very short routes to take over longer paths; as we don't
1297        check that the shorter routes actually work, a malicious
1298        direct neighbor can use this to DoS our long routes */
1299     current_route->next_hop = neighbor;
1300     current_route->target.distance = htonl (ntohl (target->distance) + 1);
1301     send_distance_change_to_plugin (&target->peer, ntohl (target->distance) + 1);
1302     return GNUNET_OK;
1303   }
1304   /* new route */
1305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306               "Discovered new route to %s using %u hops\n",
1307               GNUNET_i2s (&target->peer),
1308               (unsigned int) (ntohl (target->distance) + 1));
1309   current_route = GNUNET_new (struct Route);
1310   current_route->next_hop = neighbor;
1311   current_route->target.peer = target->peer;
1312   current_route->target.distance = htonl (ntohl (target->distance) + 1);
1313   GNUNET_assert (GNUNET_YES ==
1314                  GNUNET_CONTAINER_multipeermap_put (all_routes,
1315                                                     &current_route->target.peer,
1316                                                     current_route,
1317                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1318   send_connect_to_plugin (&current_route->target.peer,
1319                           ntohl (current_route->target.distance));
1320   return GNUNET_OK;
1321 }
1322
1323
1324 /**
1325  * Callback for set operation results. Called for each element
1326  * in the result set.
1327  * We have learned a new route from the other peer.  Add it to the
1328  * route set we're building.
1329  *
1330  * @param cls the 'struct DirectNeighbor' we're building the consensus with
1331  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
1332  * @param status see enum GNUNET_SET_Status
1333  */
1334 static void
1335 handle_set_union_result (void *cls,
1336                          const struct GNUNET_SET_Element *element,
1337                          enum GNUNET_SET_Status status)
1338 {
1339   struct DirectNeighbor *neighbor = cls;
1340   struct Target *target;
1341
1342   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1343               "Got SET union result: %d\n",
1344               status);
1345   switch (status)
1346   {
1347   case GNUNET_SET_STATUS_OK:
1348     if (sizeof (struct Target) != element->size)
1349     {
1350       GNUNET_break_op (0);
1351       return;
1352     }
1353     target = GNUNET_new (struct Target);
1354     memcpy (target, element->data, sizeof (struct Target));
1355     if (GNUNET_YES !=
1356         GNUNET_CONTAINER_multipeermap_put (neighbor->neighbor_table_consensus,
1357                                            &target->peer,
1358                                            target,
1359                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1360     {
1361       GNUNET_break_op (0);
1362       GNUNET_free (target);
1363     }
1364     break;
1365   case GNUNET_SET_STATUS_TIMEOUT:
1366   case GNUNET_SET_STATUS_FAILURE:
1367     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1368                 "Failed to establish DV union, will try again later\n");
1369     neighbor->set_op = NULL;
1370     if (NULL != neighbor->neighbor_table_consensus)
1371     {
1372       GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table_consensus,
1373                                              &free_targets,
1374                                              NULL);
1375       GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table_consensus);
1376       neighbor->neighbor_table_consensus = NULL;
1377     }
1378     if (0 < memcmp (&neighbor->peer,
1379                     &my_identity,
1380                     sizeof (struct GNUNET_PeerIdentity)))
1381       neighbor->initiate_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY,
1382                                                               &initiate_set_union,
1383                                                               neighbor);
1384     break;
1385   case GNUNET_SET_STATUS_HALF_DONE:
1386     /* we got all of our updates; integrate routing table! */
1387     neighbor->target_removed = GNUNET_NO;
1388     GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
1389                                            &check_target_removed,
1390                                            neighbor);
1391     if (GNUNET_YES == neighbor->target_removed)
1392     {
1393       /* check if we got an alternative for the removed routes */
1394       GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
1395                                              &refresh_routes,
1396                                              NULL);
1397     }
1398     /* add targets that appeared (and check for improved routes) */
1399     GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table_consensus,
1400                                            &check_target_added,
1401                                            neighbor);
1402     if (NULL != neighbor->neighbor_table)
1403     {
1404       GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
1405                                              &free_targets,
1406                                              NULL);
1407       GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table);
1408       neighbor->neighbor_table = NULL;
1409     }
1410     neighbor->neighbor_table = neighbor->neighbor_table_consensus;
1411     neighbor->neighbor_table_consensus = NULL;
1412     break;
1413   case GNUNET_SET_STATUS_DONE:
1414     /* operation done, schedule next run! */
1415     neighbor->set_op = NULL;
1416     if (0 < memcmp (&neighbor->peer,
1417                     &my_identity,
1418                     sizeof (struct GNUNET_PeerIdentity)))
1419       neighbor->initiate_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY,
1420                                                               &initiate_set_union,
1421                                                               neighbor);
1422     break;
1423   default:
1424     GNUNET_break (0);
1425     return;
1426   }
1427 }
1428
1429
1430 /**
1431  * Start creating a new DV set union construction, our neighbour has
1432  * asked for it (callback for listening peer).
1433  *
1434  * @param cls the 'struct DirectNeighbor' of the peer we're building
1435  *        a routing consensus with
1436  * @param other_peer the other peer
1437  * @param context_msg message with application specific information from
1438  *        the other peer
1439  * @param request request from the other peer, use GNUNET_SET_accept
1440  *        to accept it, otherwise the request will be refused
1441  *        Note that we don't use a return value here, as it is also
1442  *        necessary to specify the set we want to do the operation with,
1443  *        whith sometimes can be derived from the context message.
1444  *        Also necessary to specify the timeout.
1445  */
1446 static void
1447 listen_set_union (void *cls,
1448                   const struct GNUNET_PeerIdentity *other_peer,
1449                   const struct GNUNET_MessageHeader *context_msg,
1450                   struct GNUNET_SET_Request *request)
1451 {
1452   struct DirectNeighbor *neighbor = cls;
1453
1454   if (NULL == request)
1455     return; /* why??? */
1456   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1457               "Starting to create consensus with %s!\n",
1458               GNUNET_i2s (&neighbor->peer));
1459   if (NULL != neighbor->set_op)
1460   {
1461     GNUNET_SET_operation_cancel (neighbor->set_op);
1462     neighbor->set_op = NULL;
1463   }
1464   if (NULL != neighbor->my_set)
1465   {
1466     GNUNET_SET_destroy (neighbor->my_set);
1467     neighbor->my_set = NULL;
1468   }
1469   neighbor->my_set = GNUNET_SET_create (cfg,
1470                                         GNUNET_SET_OPERATION_UNION);
1471   neighbor->set_op = GNUNET_SET_accept (request,
1472                                         GNUNET_SET_RESULT_ADDED,
1473                                         &handle_set_union_result,
1474                                         neighbor);
1475   neighbor->consensus_insertion_offset = 0;
1476   neighbor->consensus_insertion_distance = 0;
1477   neighbor->consensus_elements = 0;
1478   build_set (neighbor);
1479 }
1480
1481
1482 /**
1483  * Start creating a new DV set union by initiating the connection.
1484  *
1485  * @param cls the 'struct DirectNeighbor' of the peer we're building
1486  *        a routing consensus with
1487  * @param tc scheduler context
1488  */
1489 static void
1490 initiate_set_union (void *cls,
1491                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1492 {
1493   struct DirectNeighbor *neighbor = cls;
1494
1495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1496               "Initiating SET union\n");
1497   neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
1498   neighbor->my_set = GNUNET_SET_create (cfg,
1499                                         GNUNET_SET_OPERATION_UNION);
1500   neighbor->set_op = GNUNET_SET_prepare (&neighbor->peer,
1501                                          &neighbor->real_session_id,
1502                                          NULL,
1503                                          0 /* FIXME: salt */,
1504                                          GNUNET_SET_RESULT_ADDED,
1505                                          &handle_set_union_result,
1506                                          neighbor);
1507   neighbor->consensus_insertion_offset = 0;
1508   neighbor->consensus_insertion_distance = 0;
1509   neighbor->consensus_elements = 0;
1510   build_set (neighbor);
1511 }
1512
1513
1514 /**
1515  * Core handler for DV data messages.  Whatever this message
1516  * contains all we really have to do is rip it out of its
1517  * DV layering and give it to our pal the DV plugin to report
1518  * in with.
1519  *
1520  * @param cls closure
1521  * @param peer peer which sent the message (immediate sender)
1522  * @param message the message
1523  * @return GNUNET_OK on success, GNUNET_SYSERR if the other peer violated the protocol
1524  */
1525 static int
1526 handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
1527                          const struct GNUNET_MessageHeader *message)
1528 {
1529   const struct RouteMessage *rm;
1530   const struct GNUNET_MessageHeader *payload;
1531   struct Route *route;
1532
1533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534               "Handling DV message\n");
1535   if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader))
1536   {
1537     GNUNET_break_op (0);
1538     return GNUNET_SYSERR;
1539   }
1540   rm = (const struct RouteMessage *) message;
1541   payload = (const struct GNUNET_MessageHeader *) &rm[1];
1542   if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs (payload->size))
1543   {
1544     GNUNET_break_op (0);
1545     return GNUNET_SYSERR;
1546   }
1547   if (0 == memcmp (&rm->target,
1548                    &my_identity,
1549                    sizeof (struct GNUNET_PeerIdentity)))
1550   {
1551     /* message is for me, check reverse route! */
1552     route = GNUNET_CONTAINER_multipeermap_get (all_routes,
1553                                                &rm->sender);
1554     if (NULL == route)
1555     {
1556       /* don't have reverse route, drop */
1557       GNUNET_STATISTICS_update (stats,
1558                                 "# message discarded (no reverse route)",
1559                                 1, GNUNET_NO);
1560       return GNUNET_OK;
1561     }
1562     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1563                 "Delivering %u bytes to myself!\n",
1564                 ntohs (payload->size));
1565     send_data_to_plugin (payload,
1566                          &rm->sender,
1567                          ntohl (route->target.distance));
1568     return GNUNET_OK;
1569   }
1570   route = GNUNET_CONTAINER_multipeermap_get (all_routes,
1571                                              &rm->target);
1572   if (NULL == route)
1573   {
1574     GNUNET_STATISTICS_update (stats,
1575                               "# messages discarded (no route)",
1576                               1, GNUNET_NO);
1577     return GNUNET_OK;
1578   }
1579   if (ntohl (route->target.distance) > ntohl (rm->distance) + 1)
1580   {
1581     GNUNET_STATISTICS_update (stats,
1582                               "# messages discarded (target too far)",
1583                               1, GNUNET_NO);
1584     return GNUNET_OK;
1585   }
1586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1587               "Forwarding message to %s\n",
1588               GNUNET_i2s (&rm->target));
1589   forward_payload (route->next_hop,
1590                    ntohl (route->target.distance),
1591                    0,
1592                    &rm->target,
1593                    &rm->sender,
1594                    payload);
1595   return GNUNET_OK;
1596 }
1597
1598
1599 /**
1600  * Service server's handler for message send requests (which come
1601  * bubbling up to us through the DV plugin).
1602  *
1603  * @param cls closure
1604  * @param client identification of the client
1605  * @param message the actual message
1606  */
1607 static void
1608 handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
1609                         const struct GNUNET_MessageHeader *message)
1610 {
1611   struct Route *route;
1612   const struct GNUNET_DV_SendMessage *msg;
1613   const struct GNUNET_MessageHeader *payload;
1614
1615   if (ntohs (message->size) < sizeof (struct GNUNET_DV_SendMessage) + sizeof (struct GNUNET_MessageHeader))
1616   {
1617     GNUNET_break (0);
1618     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1619     return;
1620   }
1621   msg = (const struct GNUNET_DV_SendMessage *) message;
1622   GNUNET_break (0 != ntohl (msg->uid));
1623   payload = (const struct GNUNET_MessageHeader *) &msg[1];
1624   if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size))
1625   {
1626     GNUNET_break (0);
1627     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1628     return;
1629   }
1630   route = GNUNET_CONTAINER_multipeermap_get (all_routes,
1631                                              &msg->target);
1632   if (NULL == route)
1633   {
1634     /* got disconnected */
1635     GNUNET_STATISTICS_update (stats,
1636                               "# local messages discarded (no route)",
1637                               1, GNUNET_NO);
1638     send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES);
1639     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1640     return;
1641   }
1642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1643               "Forwarding %u bytes to %s\n",
1644               ntohs (payload->size),
1645               GNUNET_i2s (&msg->target));
1646
1647   forward_payload (route->next_hop,
1648                    ntohl (route->target.distance),
1649                    htonl (msg->uid),
1650                    &msg->target,
1651                    &my_identity,
1652                    payload);
1653   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1654 }
1655
1656
1657 /**
1658  * Cleanup all of the data structures associated with a given neighbor.
1659  *
1660  * @param neighbor neighbor to clean up
1661  */
1662 static void
1663 cleanup_neighbor (struct DirectNeighbor *neighbor)
1664 {
1665   struct PendingMessage *pending;
1666
1667   while (NULL != (pending = neighbor->pm_head))
1668   {
1669     neighbor->pm_queue_size--;
1670     GNUNET_CONTAINER_DLL_remove (neighbor->pm_head,
1671                                  neighbor->pm_tail,
1672                                  pending);
1673     GNUNET_free (pending);
1674   }
1675   handle_direct_disconnect (neighbor);
1676   GNUNET_assert (GNUNET_YES ==
1677                  GNUNET_CONTAINER_multipeermap_remove (direct_neighbors,
1678                                                        &neighbor->peer,
1679                                                        neighbor));
1680   GNUNET_free (neighbor);
1681 }
1682
1683
1684 /**
1685  * Method called whenever a given peer disconnects.
1686  *
1687  * @param cls closure
1688  * @param peer peer identity this notification is about
1689  */
1690 static void
1691 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
1692 {
1693   struct DirectNeighbor *neighbor;
1694
1695   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696               "Received core peer disconnect message for peer `%s'!\n",
1697               GNUNET_i2s (peer));
1698   /* Check for disconnect from self message */
1699   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
1700     return;
1701   neighbor =
1702       GNUNET_CONTAINER_multipeermap_get (direct_neighbors, peer);
1703   if (NULL == neighbor)
1704   {
1705     GNUNET_break (0);
1706     return;
1707   }
1708   GNUNET_break (GNUNET_YES == neighbor->connected);
1709   neighbor->connected = GNUNET_NO;
1710   if (DIRECT_NEIGHBOR_COST == neighbor->distance)
1711   {
1712     GNUNET_STATISTICS_update (stats,
1713                               "# peers connected (1-hop)",
1714                               -1, GNUNET_NO);
1715   }
1716   cleanup_neighbor (neighbor);
1717   GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
1718                                          &refresh_routes,
1719                                          NULL);
1720 }
1721
1722
1723 /**
1724  * Multipeermap iterator for freeing routes.  Should never be called.
1725  *
1726  * @param cls NULL
1727  * @param key key value stored under
1728  * @param value the route to be freed
1729  *
1730  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
1731  */
1732 static int
1733 free_route (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1734 {
1735   struct Route *route = value;
1736
1737   GNUNET_break (0);
1738   GNUNET_assert (GNUNET_YES ==
1739                  GNUNET_CONTAINER_multipeermap_remove (all_routes, key, value));
1740   release_route (route);
1741   send_disconnect_to_plugin (&route->target.peer);
1742   GNUNET_free (route);
1743   return GNUNET_YES;
1744 }
1745
1746
1747 /**
1748  * Multipeermap iterator for freeing direct neighbors. Should never be called.
1749  *
1750  * @param cls NULL
1751  * @param key key value stored under
1752  * @param value the direct neighbor to be freed
1753  *
1754  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
1755  */
1756 static int
1757 free_direct_neighbors (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1758 {
1759   struct DirectNeighbor *neighbor = value;
1760
1761   GNUNET_break (0);
1762   cleanup_neighbor (neighbor);
1763   return GNUNET_YES;
1764 }
1765
1766
1767 /**
1768  * Task run during shutdown.
1769  *
1770  * @param cls unused
1771  * @param tc unused
1772  */
1773 static void
1774 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1775 {
1776   unsigned int i;
1777
1778   GNUNET_CORE_disconnect (core_api);
1779   core_api = NULL;
1780   GNUNET_ATS_performance_done (ats);
1781   ats = NULL;
1782   GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
1783                                          &free_direct_neighbors, NULL);
1784   GNUNET_CONTAINER_multipeermap_iterate (all_routes,
1785                                          &free_route, NULL);
1786   GNUNET_CONTAINER_multipeermap_destroy (direct_neighbors);
1787   GNUNET_CONTAINER_multipeermap_destroy (all_routes);
1788   GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1789   stats = NULL;
1790   GNUNET_SERVER_notification_context_destroy (nc);
1791   nc = NULL;
1792   for (i=0;i<DEFAULT_FISHEYE_DEPTH - 1;i++)
1793     GNUNET_array_grow (consensi[i].targets,
1794                        consensi[i].array_length,
1795                        0);
1796 }
1797
1798
1799 /**
1800  * Notify newly connected client about an existing route.
1801  *
1802  * @param cls the 'struct GNUNET_SERVER_Client'
1803  * @param key peer identity
1804  * @param value the XXX.
1805  * @return GNUNET_OK (continue to iterate)
1806  */
1807 static int
1808 add_route (void *cls,
1809            const struct GNUNET_PeerIdentity *key,
1810            void *value)
1811 {
1812   struct GNUNET_SERVER_Client *client = cls;
1813   struct Route *route = value;
1814   struct GNUNET_DV_ConnectMessage cm;
1815
1816   cm.header.size = htons (sizeof (cm));
1817   cm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_CONNECT);
1818   cm.distance = htonl (route->target.distance);
1819   cm.peer = route->target.peer;
1820
1821   GNUNET_SERVER_notification_context_unicast (nc,
1822                                               client,
1823                                               &cm.header,
1824                                               GNUNET_NO);
1825   return GNUNET_OK;
1826 }
1827
1828
1829 /**
1830  * Handle START-message.  This is the first message sent to us
1831  * by the client (can only be one!).
1832  *
1833  * @param cls closure (always NULL)
1834  * @param client identification of the client
1835  * @param message the actual message
1836  */
1837 static void
1838 handle_start (void *cls, struct GNUNET_SERVER_Client *client,
1839               const struct GNUNET_MessageHeader *message)
1840 {
1841   GNUNET_SERVER_notification_context_add (nc, client);
1842   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1843   GNUNET_CONTAINER_multipeermap_iterate (all_routes,
1844                                          &add_route,
1845                                          client);
1846 }
1847
1848
1849 /**
1850  * Called on core init.
1851  *
1852  * @param cls unused
1853  * @param identity this peer's identity
1854  */
1855 static void
1856 core_init (void *cls,
1857            const struct GNUNET_PeerIdentity *identity)
1858 {
1859   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1860               "I am peer: %s\n",
1861               GNUNET_i2s (identity));
1862   my_identity = *identity;
1863 }
1864
1865
1866 /**
1867  * Process dv requests.
1868  *
1869  * @param cls closure
1870  * @param server the initialized server
1871  * @param c configuration to use
1872  */
1873 static void
1874 run (void *cls, struct GNUNET_SERVER_Handle *server,
1875      const struct GNUNET_CONFIGURATION_Handle *c)
1876 {
1877   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1878     {&handle_dv_route_message, GNUNET_MESSAGE_TYPE_DV_ROUTE, 0},
1879     {NULL, 0, 0}
1880   };
1881   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
1882     {&handle_start, NULL,
1883      GNUNET_MESSAGE_TYPE_DV_START,
1884      sizeof (struct GNUNET_MessageHeader) },
1885     { &handle_dv_send_message, NULL,
1886       GNUNET_MESSAGE_TYPE_DV_SEND,
1887       0},
1888     {NULL, NULL, 0, 0}
1889   };
1890
1891   cfg = c;
1892   direct_neighbors = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
1893   all_routes = GNUNET_CONTAINER_multipeermap_create (65536, GNUNET_NO);
1894   core_api = GNUNET_CORE_connect (cfg, NULL,
1895                                   &core_init,
1896                                   &handle_core_connect,
1897                                   &handle_core_disconnect,
1898                                   NULL, GNUNET_NO,
1899                                   NULL, GNUNET_NO,
1900                                   core_handlers);
1901
1902   if (NULL == core_api)
1903     return;
1904   ats = GNUNET_ATS_performance_init (cfg, &handle_ats_update, NULL);
1905   if (NULL == ats)
1906   {
1907     GNUNET_CORE_disconnect (core_api);
1908     return;
1909   }
1910   nc = GNUNET_SERVER_notification_context_create (server,
1911                                                   MAX_QUEUE_SIZE_PLUGIN);
1912   stats = GNUNET_STATISTICS_create ("dv", cfg);
1913   GNUNET_SERVER_add_handlers (server, plugin_handlers);
1914   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1915                                 &shutdown_task, NULL);
1916 }
1917
1918
1919 /**
1920  * The main function for the dv service.
1921  *
1922  * @param argc number of arguments from the command line
1923  * @param argv command line arguments
1924  * @return 0 ok, 1 on error
1925  */
1926 int
1927 main (int argc, char *const *argv)
1928 {
1929   return (GNUNET_OK ==
1930           GNUNET_SERVICE_run (argc, argv, "dv", GNUNET_SERVICE_OPTION_NONE,
1931                               &run, NULL)) ? 0 : 1;
1932 }
1933
1934 /* end of gnunet-service-dv.c */