2bf923ccc24ed03347e0fc7627081bb251c74b60
[oweals/gnunet.git] / src / dv / gnunet-service-dv.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dv/gnunet-service-dv.c
23  * @brief the distance vector service, primarily handles gossip of nearby
24  * peers and sending/receiving DV messages from core and decapsulating
25  * them
26  *
27  * @author Christian Grothoff
28  * @author Nathan Evans
29  */
30 #include "platform.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_peerinfo_service.h"
36 #include "gnunet_statistics_service.h"
37 #include "gnunet_consensus_service.h"
38 #include "dv.h"
39
40 /**
41  * How often do we establish the consensu?
42  */
43 #define GNUNET_DV_CONSENSUS_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5))
44
45 /**
46  * Maximum number of messages we queue per peer.
47  */
48 #define MAX_QUEUE_SIZE 16
49
50 /**
51  * The default fisheye depth, from how many hops away will
52  * we keep peers?
53  */
54 #define DEFAULT_FISHEYE_DEPTH 3
55
56 /**
57  * How many hops is a direct neighbor away?
58  */
59 #define DIRECT_NEIGHBOR_COST 1
60
61
62 GNUNET_NETWORK_STRUCT_BEGIN
63
64 /**
65  * Information about a peer DV can route to.  These entries are what
66  * we use as the binary format to establish consensus to create our
67  * routing table and as the address format in the HELLOs.
68  */
69 struct Target
70 {
71
72   /**
73    * Identity of the peer we can reach.
74    */
75   struct GNUNET_PeerIdentity peer;
76
77   /**
78    * How many hops (1-3) is this peer away?
79    */
80   uint32_t distance GNUNET_PACKED;
81
82 };
83
84
85 /**
86  * Message exchanged between DV services (via core), requesting a
87  * message to be routed.  
88  */
89 struct RouteMessage
90 {
91   /**
92    * Type: GNUNET_MESSAGE_TYPE_DV_ROUTE
93    */
94   struct GNUNET_MessageHeader header;
95
96   /**
97    * Expected (remaining) distance.  Must be always smaller than
98    * DEFAULT_FISHEYE_DEPTH, should be zero at the target.  Must
99    * be decremented by one at each hop.  Peers must not forward
100    * these messages further once the counter has reached zero.
101    */
102   uint32_t distance GNUNET_PACKED;
103
104   /**
105    * The (actual) target of the message (this peer, if distance is zero).
106    */
107   struct GNUNET_PeerIdentity target;
108
109   /**
110    * The (actual) sender of the message.
111    */
112   struct GNUNET_PeerIdentity sender;
113
114 };
115
116 GNUNET_NETWORK_STRUCT_END
117
118
119 /**
120  * Linked list of messages to send to clients.
121  */
122 struct PendingMessage
123 {
124   /**
125    * Pointer to next item in the list
126    */
127   struct PendingMessage *next;
128
129   /**
130    * Pointer to previous item in the list
131    */
132   struct PendingMessage *prev;
133
134   /**
135    * Actual message to be sent, allocated after this struct.
136    */
137   const struct GNUNET_MessageHeader *msg;
138
139   /**
140    * Ultimate target for the message.
141    */
142   struct GNUNET_PeerIdentity ultimate_target;
143
144   /**
145    * Unique ID of the message.
146    */
147   uint32_t uid;
148
149 };
150
151
152 /**
153  * Information about a direct neighbor (core-level, excluding
154  * DV-links, only DV-enabled peers).
155  */
156 struct DirectNeighbor
157 {
158
159   /**
160    * Identity of the peer.
161    */
162   struct GNUNET_PeerIdentity peer;
163   
164   /**
165    * Head of linked list of messages to send to this peer.
166    */
167   struct PendingMessage *pm_head;
168
169   /**
170    * Tail of linked list of messages to send to this peer.
171    */
172   struct PendingMessage *pm_tail;
173
174   /**
175    * Transmit handle to core service.
176    */
177   struct GNUNET_CORE_TransmitHandle *cth;
178
179   /**
180    * Routing table of the neighbor, NULL if not yet established.
181    * Keys are peer identities, values are 'struct Target' entries.
182    * Note that the distances in the targets are from the point-of-view
183    * of the peer, not from us!
184    */ 
185   struct GNUNET_CONTAINER_MultiHashMap *neighbor_table;
186
187   /**
188    * Updated routing table of the neighbor, under construction,
189    * NULL if we are not currently building it.
190    * Keys are peer identities, values are 'struct Target' entries.
191    * Note that the distances in the targets are from the point-of-view
192    * of the peer, not from us!
193    */ 
194   struct GNUNET_CONTAINER_MultiHashMap *neighbor_table_consensus;
195
196   /**
197    * Active consensus, if we are currently synchronizing the
198    * routing tables.
199    */
200   struct GNUNET_CONSENSUS_Handle *consensus;
201
202   /**
203    * ID of the task we use to (periodically) update our consensus
204    * with this peer.
205    */
206   GNUNET_SCHEDULER_TaskIdentifier consensus_task;
207
208   /**
209    * At what offset are we, with respect to inserting our own routes
210    * into the consensus?
211    */
212   unsigned int consensus_insertion_offset;
213
214   /**
215    * At what distance are we, with respect to inserting our own routes
216    * into the consensus?
217    */
218   unsigned int consensus_insertion_distance;
219
220   /**
221    * Number of messages currently in the 'pm_XXXX'-DLL.
222    */
223   unsigned int pm_queue_size;
224
225 };
226
227
228 /**
229  * A route includes information about the next hop,
230  * the target, and the ultimate distance to the
231  * target.
232  */
233 struct Route
234 {
235
236   /**
237    * Which peer do we need to forward the message to?
238    */
239   struct DirectNeighbor *next_hop;
240
241   /**
242    * What would be the target, and how far is it away?
243    */
244   struct Target target;
245
246   /**
247    * Offset of this target in the respective consensus set.
248    */
249   unsigned int set_offset;
250
251 };
252
253
254 /**
255  * Set of targets we bring to a consensus; all targets in a set have a
256  * distance equal to the sets distance (which is implied by the array
257  * index of the set).
258  */
259 struct ConsensusSet
260 {
261
262   /**
263    * Array of targets in the set, may include NULL entries if a
264    * neighbor has disconnected; the targets are allocated with the
265    * respective container (all_routes), not here.
266    */
267   struct Route **targets;
268
269   /**
270    * Size of the 'targets' array.
271    */
272   unsigned int array_length;
273
274 };
275
276
277 /**
278  * Hashmap of all of our direct neighbors (no DV routing).
279  */
280 static struct GNUNET_CONTAINER_MultiHashMap *direct_neighbors;
281
282 /**
283  * Hashmap with all routes that we currently support; contains 
284  * routing information for all peers from distance 2
285  * up to distance DEFAULT_FISHEYE_DEPTH.
286  */
287 static struct GNUNET_CONTAINER_MultiHashMap *all_routes;
288
289 /**
290  * Array of consensus sets we expose to the outside world.  Sets
291  * are structured by the distance to the target.
292  */
293 static struct ConsensusSet consensi[DEFAULT_FISHEYE_DEPTH - 1];
294
295 /**
296  * Handle to the core service api.
297  */
298 static struct GNUNET_CORE_Handle *core_api;
299
300 /**
301  * The identity of our peer.
302  */
303 static struct GNUNET_PeerIdentity my_identity;
304
305 /**
306  * The configuration for this service.
307  */
308 static const struct GNUNET_CONFIGURATION_Handle *cfg;
309
310 /**
311  * The client, the DV plugin connected to us.  Hopefully
312  * this client will never change, although if the plugin dies
313  * and returns for some reason it may happen.
314  */
315 static struct GNUNET_SERVER_Client *client_handle;
316
317 /**
318  * Transmit handle to the plugin.
319  */
320 static struct GNUNET_SERVER_TransmitHandle *plugin_transmit_handle;
321
322 /**
323  * Head of DLL for client messages
324  */
325 static struct PendingMessage *plugin_pending_head;
326
327 /**
328  * Tail of DLL for client messages
329  */
330 static struct PendingMessage *plugin_pending_tail;
331
332 /**
333  * Handle for the statistics service.
334  */
335 struct GNUNET_STATISTICS_Handle *stats;
336
337
338 /**
339  * Get distance information from 'atsi'.
340  *
341  * @param atsi performance data
342  * @param atsi_count number of entries in atsi
343  * @return connected transport distance
344  */
345 static uint32_t
346 get_atsi_distance (const struct GNUNET_ATS_Information *atsi,
347                    unsigned int atsi_count)
348 {
349   unsigned int i;
350
351   for (i = 0; i < atsi_count; i++)
352     if (ntohl (atsi[i].type) == GNUNET_ATS_QUALITY_NET_DISTANCE)
353       return ntohl (atsi->value);
354   /* FIXME: we do not have distance data? Assume direct neighbor. */
355   return DIRECT_NEIGHBOR_COST;
356 }
357
358
359 /**
360  * Function called to notify a client about the socket
361  * begin ready to queue more data.  "buf" will be
362  * NULL and "size" zero if the socket was closed for
363  * writing in the meantime.
364  *
365  * @param cls closure
366  * @param size number of bytes available in buf
367  * @param buf where the callee should write the message
368  * @return number of bytes written to buf
369  */
370 static size_t
371 transmit_to_plugin (void *cls, size_t size, void *buf)
372 {
373   char *cbuf = buf;
374   struct PendingMessage *reply;
375   size_t off;
376   size_t msize;
377
378   plugin_transmit_handle = NULL;
379   if (NULL == buf)
380   {
381     /* client disconnected */    
382     return 0;
383   }
384   off = 0;
385   while ( (NULL != (reply = plugin_pending_head)) &&
386           (size >= off + (msize = ntohs (reply->msg->size))))
387   {
388     GNUNET_CONTAINER_DLL_remove (plugin_pending_head, plugin_pending_tail,
389                                  reply);
390     memcpy (&cbuf[off], reply->msg, msize);
391     GNUNET_free (reply);
392     off += msize;
393   }
394   if (NULL != plugin_pending_head)
395     plugin_transmit_handle =
396       GNUNET_SERVER_notify_transmit_ready (client_handle,
397                                            msize,
398                                            GNUNET_TIME_UNIT_FOREVER_REL,
399                                            &transmit_to_plugin, NULL);
400   return off;
401 }
402
403
404 /**
405  * Forward a message from another peer to the plugin.
406  *
407  * @param message the message to send to the plugin
408  * @param distant_neighbor the original sender of the message
409  * @param distnace distance to the original sender of the message
410  */
411 static void
412 send_data_to_plugin (const struct GNUNET_MessageHeader *message, 
413                      const struct GNUNET_PeerIdentity *distant_neighbor, 
414                      uint32_t distance)
415 {
416   struct GNUNET_DV_ReceivedMessage *received_msg;
417   struct PendingMessage *pending_message;
418   size_t size;
419
420   if (NULL == client_handle)
421   {
422     GNUNET_STATISTICS_update (stats,
423                               "# messages discarded (no plugin)",
424                               1, GNUNET_NO);
425     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
426                 _("Refusing to queue messages, DV plugin not active.\n"));
427     return;
428   }
429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
430               "Delivering message from peer `%s'\n",
431               GNUNET_i2s (distant_neighbor));
432   size = sizeof (struct GNUNET_DV_ReceivedMessage) + 
433     ntohs (message->size);
434   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
435   {    
436     GNUNET_break (0); /* too big */
437     return;
438   }
439   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + size);
440   received_msg = (struct GNUNET_DV_ReceivedMessage *) &pending_message[1];
441   received_msg->header.size = htons (size);
442   received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV);
443   received_msg->distance = htonl (distance);
444   received_msg->sender = *distant_neighbor;
445   memcpy (&received_msg[1], message, ntohs (message->size));
446   GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head, 
447                                     plugin_pending_tail,
448                                     pending_message);  
449   if (NULL == plugin_transmit_handle)
450     plugin_transmit_handle =
451       GNUNET_SERVER_notify_transmit_ready (client_handle, size,
452                                            GNUNET_TIME_UNIT_FOREVER_REL,
453                                            &transmit_to_plugin, NULL);
454 }
455
456
457 /**
458  * Forward a control message to the plugin.
459  *
460  * @param message the message to send to the plugin
461  * @param distant_neighbor the original sender of the message
462  * @param distnace distance to the original sender of the message
463  */
464 static void
465 send_control_to_plugin (const struct GNUNET_MessageHeader *message)
466 {
467   struct PendingMessage *pending_message;
468   size_t size;
469
470   if (NULL == client_handle)
471   {
472     GNUNET_STATISTICS_update (stats,
473                               "# control messages discarded (no plugin)",
474                               1, GNUNET_NO);
475     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
476                 _("Refusing to queue messages, DV plugin not active.\n"));
477     return;
478   }
479   size = ntohs (message->size);
480   pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + size);
481   memcpy (&pending_message[1], message, size);
482   GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head, 
483                                     plugin_pending_tail,
484                                     pending_message);  
485   if (NULL == plugin_transmit_handle)
486     plugin_transmit_handle =
487       GNUNET_SERVER_notify_transmit_ready (client_handle, size,
488                                            GNUNET_TIME_UNIT_FOREVER_REL,
489                                            &transmit_to_plugin, NULL);
490 }
491
492
493 /**
494  * Give an ACK message to the plugin, we transmitted a message for it.
495  *
496  * @param target peer that received the message
497  * @param uid plugin-chosen UID for the message
498  */
499 static void
500 send_ack_to_plugin (const struct GNUNET_PeerIdentity *target, 
501                     uint32_t uid)
502 {
503   struct GNUNET_DV_AckMessage ack_msg;
504
505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506               "Delivering ACK for message to peer `%s'\n",
507               GNUNET_i2s (target));
508   ack_msg.header.size = htons (sizeof (ack_msg));
509   ack_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND_ACK);
510   ack_msg.uid = htonl (uid);
511   ack_msg.target = *target;
512   send_control_to_plugin (&ack_msg.header);
513 }
514
515
516 /**
517  * Give a CONNECT message to the plugin.
518  *
519  * @param target peer that connected
520  * @param distance distance to the target
521  */
522 static void
523 send_connect_to_plugin (const struct GNUNET_PeerIdentity *target, 
524                         uint32_t distance)
525 {
526   struct GNUNET_DV_ConnectMessage cm;
527
528   if (NULL == client_handle)
529     return;
530   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
531               "Delivering CONNECT about peer `%s'\n",
532               GNUNET_i2s (target));
533   cm.header.size = htons (sizeof (cm));
534   cm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_CONNECT);
535   cm.distance = htonl (distance);
536   cm.peer = *target;
537   send_control_to_plugin (&cm.header);
538 }
539
540
541 /**
542  * Give a DISCONNECT message to the plugin.
543  *
544  * @param target peer that disconnected
545  */
546 static void
547 send_disconnect_to_plugin (const struct GNUNET_PeerIdentity *target)
548 {
549   struct GNUNET_DV_DisconnectMessage dm;
550
551   if (NULL == client_handle)
552     return;
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "Delivering DISCONNECT about peer `%s'\n",
555               GNUNET_i2s (target));
556   dm.header.size = htons (sizeof (dm));
557   dm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISCONNECT);
558   dm.reserved = htonl (0);
559   dm.peer = *target;
560   send_control_to_plugin (&dm.header);
561 }
562
563
564 /**
565  * Function called to transfer a message to another peer
566  * via core.
567  *
568  * @param cls closure with the direct neighbor
569  * @param size number of bytes available in buf
570  * @param buf where the callee should write the message
571  * @return number of bytes written to buf
572  */
573 static size_t
574 core_transmit_notify (void *cls, size_t size, void *buf)
575 {
576   struct DirectNeighbor *dn = cls;
577   char *cbuf = buf;
578   struct PendingMessage *pending;
579   size_t off;
580   size_t msize;
581
582   dn->cth = NULL;
583   if (NULL == buf)
584   {
585     /* peer disconnected */
586     return 0;
587   }
588   off = 0;
589   pending = dn->pm_head;
590   off = 0;
591   while ( (NULL != (pending = dn->pm_head)) &&
592           (size >= off + (msize = ntohs (pending->msg->size))))
593   {
594     dn->pm_queue_size--;
595     GNUNET_CONTAINER_DLL_remove (dn->pm_head,
596                                  dn->pm_tail,
597                                  pending);
598     memcpy (&cbuf[off], pending->msg, msize);
599     send_ack_to_plugin (&pending->ultimate_target,
600                         pending->uid);
601     GNUNET_free (pending);
602     off += msize;
603   }
604   if (NULL != dn->pm_head)
605     dn->cth =
606       GNUNET_CORE_notify_transmit_ready (core_api,
607                                          GNUNET_YES /* cork */,
608                                          0 /* priority */,
609                                          GNUNET_TIME_UNIT_FOREVER_REL,
610                                          &dn->peer,
611                                          msize,                                  
612                                          &core_transmit_notify, dn);
613   return off;
614 }
615
616
617 /**
618  * Forward the given payload to the given target.
619  *
620  * @param target where to send the message
621  * @param distance expected (remaining) distance to the target
622  * @param sender original sender of the message
623  * @param payload payload of the message
624  */
625 static void
626 forward_payload (struct DirectNeighbor *target,
627                  uint32_t distance,
628                  const struct GNUNET_PeerIdentity *sender,
629                  const struct GNUNET_MessageHeader *payload)
630 {
631   struct PendingMessage *pm;
632   struct RouteMessage *rm;
633   size_t msize;
634
635   if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
636        (0 != memcmp (sender,
637                      &my_identity,
638                      sizeof (struct GNUNET_PeerIdentity))) )
639     return;
640   msize = sizeof (struct RouteMessage) + ntohs (payload->size);
641   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
642   {
643     GNUNET_break (0);
644     return;
645   }
646   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
647   pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
648   rm = (struct RouteMessage *) &pm[1];
649   rm->header.size = htons ((uint16_t) msize);
650   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE);
651   rm->distance = htonl (distance);
652   rm->target = target->peer;
653   rm->sender = *sender;
654   memcpy (&rm[1], payload, ntohs (payload->size));
655   GNUNET_CONTAINER_DLL_insert_tail (target->pm_head,
656                                     target->pm_tail,
657                                     pm);
658   target->pm_queue_size++;
659   if (NULL == target->cth)
660     target->cth = GNUNET_CORE_notify_transmit_ready (core_api,
661                                                      GNUNET_YES /* cork */,
662                                                      0 /* priority */,
663                                                      GNUNET_TIME_UNIT_FOREVER_REL,
664                                                      &target->peer,
665                                                      msize,                                      
666                                                      &core_transmit_notify, target);
667 }
668
669
670 /**
671  * Find a free slot for storing a 'route' in the 'consensi'
672  * set at the given distance.
673  *
674  * @param distance distance to use for the set slot
675  */
676 static unsigned int
677 get_consensus_slot (uint32_t distance)
678 {
679   struct ConsensusSet *cs;
680   unsigned int i;
681
682   cs = &consensi[distance];
683   i = 0;
684   while ( (i < cs->array_length) &&
685           (NULL != cs->targets[i]) ) i++;
686   if (i == cs->array_length)
687     GNUNET_array_grow (cs->targets,
688                        cs->array_length,
689                        cs->array_length * 2 + 2);
690   return i;
691 }
692
693
694 /**
695  * Allocate a slot in the consensus set for a route.
696  *
697  * @param route route to initialize
698  * @param distance which consensus set to use
699  */
700 static void
701 allocate_route (struct Route *route,
702                 uint32_t distance)
703 {
704   unsigned int i;
705
706   i = get_consensus_slot (distance);
707   route->set_offset = i;
708   consensi[distance].targets[i] = route;
709   route->target.distance = distance;
710 }
711
712
713 /**
714  * Release a slot in the consensus set for a route.
715  *
716  * @param route route to release the slot from
717  */
718 static void
719 release_route (struct Route *route)
720 {
721   consensi[route->target.distance].targets[route->set_offset] = NULL;
722   route->set_offset = UINT_MAX; /* indicate invalid slot */
723 }
724
725
726 /**
727  * Move a route from one consensus set to another.
728  *
729  * @param route route to move
730  * @param new_distance new distance for the route (destination set)
731  */
732 static void
733 move_route (struct Route *route,
734             uint32_t new_distance)
735 {
736   unsigned int i;
737
738   release_route (route);
739   i = get_consensus_slot (new_distance);
740   route->set_offset = i;
741   consensi[new_distance].targets[i] = route;     
742   route->target.distance = new_distance;
743 }
744
745
746 /**
747  * Method called whenever a peer connects.
748  *
749  * @param cls closure
750  * @param peer peer identity this notification is about
751  * @param atsi performance data
752  * @param atsi_count number of entries in atsi
753  */
754 static void
755 handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
756                      const struct GNUNET_ATS_Information *atsi,
757                      unsigned int atsi_count)
758 {
759   struct DirectNeighbor *neighbor;
760   struct Route *route;
761   uint32_t distance;
762
763   /* Check for connect to self message */
764   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
765     return;
766   distance = get_atsi_distance (atsi, atsi_count);
767   neighbor = GNUNET_CONTAINER_multihashmap_get (direct_neighbors, 
768                                                 &peer->hashPubKey);
769   if (NULL != neighbor)
770   {
771     GNUNET_break (0);
772     return;
773   }
774   if (DIRECT_NEIGHBOR_COST != distance) 
775     return; /* is a DV-neighbor */
776   neighbor = GNUNET_malloc (sizeof (struct DirectNeighbor));
777   neighbor->peer = *peer;
778   GNUNET_assert (GNUNET_YES ==
779                  GNUNET_CONTAINER_multihashmap_put (direct_neighbors,
780                                                     &peer->hashPubKey,
781                                                     neighbor,
782                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
783   route = GNUNET_CONTAINER_multihashmap_get (all_routes, 
784                                              &peer->hashPubKey);
785   if (NULL != route)  
786   {
787     send_disconnect_to_plugin (peer);
788     release_route (route);
789     GNUNET_free (route);
790   }
791   route->next_hop = neighbor;
792   // FIXME: begin exchange_routing_information!
793 }
794
795
796
797 /**
798  * Core handler for DV data messages.  Whatever this message
799  * contains all we really have to do is rip it out of its
800  * DV layering and give it to our pal the DV plugin to report
801  * in with.
802  *
803  * @param cls closure
804  * @param peer peer which sent the message (immediate sender)
805  * @param message the message
806  * @param atsi transport ATS information (latency, distance, etc.)
807  * @param atsi_count number of entries in atsi
808  * @return GNUNET_OK on success, GNUNET_SYSERR if the other peer violated the protocol
809  */
810 static int
811 handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
812                          const struct GNUNET_MessageHeader *message,
813                          const struct GNUNET_ATS_Information *atsi,
814                          unsigned int atsi_count)
815 {
816   const struct RouteMessage *rm;
817   const struct GNUNET_MessageHeader *payload;
818   struct Route *route;
819
820   if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader))
821   {
822     GNUNET_break_op (0);
823     return GNUNET_SYSERR;
824   }
825   rm = (const struct RouteMessage *) message;
826   payload = (const struct GNUNET_MessageHeader *) &rm[1];
827   if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs (payload->size))
828   {
829     GNUNET_break_op (0);
830     return GNUNET_SYSERR;
831   }
832   if (0 == memcmp (&rm->target,
833                    &my_identity,
834                    sizeof (struct GNUNET_PeerIdentity)))
835   {
836     /* message is for me, check reverse route! */
837     route = GNUNET_CONTAINER_multihashmap_get (all_routes,
838                                                &rm->sender.hashPubKey);
839     if (NULL == route)
840     {
841       /* don't have reverse route, drop */
842       GNUNET_STATISTICS_update (stats,
843                                 "# message discarded (no reverse route)",
844                                 1, GNUNET_NO);
845       return GNUNET_OK;
846     }
847     send_data_to_plugin (payload,
848                          &rm->sender,
849                          route->target.distance);
850     return GNUNET_OK;
851   }
852   route = GNUNET_CONTAINER_multihashmap_get (all_routes,
853                                              &rm->target.hashPubKey);
854   if (NULL == route)
855   {
856     GNUNET_STATISTICS_update (stats,
857                               "# messages discarded (no route)",
858                               1, GNUNET_NO);
859     return GNUNET_OK;
860   }
861   if (route->target.distance > ntohl (rm->distance) + 1)
862   {
863     GNUNET_STATISTICS_update (stats,
864                               "# messages discarded (target too far)",
865                               1, GNUNET_NO);
866     return GNUNET_OK;
867   }
868   forward_payload (route->next_hop,
869                    route->target.distance,
870                    &rm->sender,
871                    payload);
872   return GNUNET_OK;  
873 }
874
875
876 /**
877  * Service server's handler for message send requests (which come
878  * bubbling up to us through the DV plugin).
879  *
880  * @param cls closure
881  * @param client identification of the client
882  * @param message the actual message
883  */
884 static void
885 handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
886                         const struct GNUNET_MessageHeader *message)
887 {
888   struct Route *route;
889   const struct GNUNET_DV_SendMessage *msg;
890   const struct GNUNET_MessageHeader *payload;
891
892   if (ntohs (message->size) < sizeof (struct GNUNET_DV_SendMessage) + sizeof (struct GNUNET_MessageHeader))
893   {
894     GNUNET_break (0);
895     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
896     return;
897   }
898   msg = (const struct GNUNET_DV_SendMessage *) message;
899   payload = (const struct GNUNET_MessageHeader *) &msg[1];
900   if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size))
901   {
902     GNUNET_break (0);
903     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
904     return;
905   }
906   route = GNUNET_CONTAINER_multihashmap_get (all_routes,
907                                              &msg->target.hashPubKey);
908   if (NULL == route)
909   {
910     /* got disconnected, send ACK anyway? 
911        FIXME: What we really want is an 'NACK' here... */
912     GNUNET_STATISTICS_update (stats,
913                               "# local messages discarded (no route)",
914                               1, GNUNET_NO);
915     send_ack_to_plugin (&msg->target, htonl (msg->uid));
916     GNUNET_SERVER_receive_done (client, GNUNET_OK);
917     return;
918   }
919   // FIXME: flow control (send ACK only once message has left the queue...)
920   send_ack_to_plugin (&msg->target, htonl (msg->uid));
921   forward_payload (route->next_hop,
922                    route->target.distance,
923                    &my_identity,
924                    payload);
925   GNUNET_SERVER_receive_done (client, GNUNET_OK);
926 }
927
928
929 /**
930  * Multihashmap iterator for freeing routes that go via a particular
931  * neighbor that disconnected and is thus no longer available.
932  *
933  * @param cls the direct neighbor that is now unavailable
934  * @param key key value stored under
935  * @param value a 'struct Route' that may or may not go via neighbor
936  *
937  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
938  */
939 static int
940 cull_routes (void *cls, const struct GNUNET_HashCode * key, void *value)
941 {
942   struct DirectNeighbor *neighbor = cls;
943   struct Route *route = value;
944
945   if (route->next_hop != neighbor)
946     return GNUNET_YES; /* not affected */
947   GNUNET_assert (GNUNET_YES ==
948                  GNUNET_CONTAINER_multihashmap_remove (all_routes, key, value));
949   release_route (route);
950   send_disconnect_to_plugin (&route->target.peer);
951   GNUNET_free (route);
952   return GNUNET_YES;
953 }
954
955
956 /**
957  * Multihashmap iterator for checking if a given route is
958  * (now) useful to this peer.
959  *
960  * @param cls the direct neighbor for the given route
961  * @param key key value stored under
962  * @param value a 'struct Target' that may or may not be useful; not that
963  *        the distance in 'target' does not include the first hop yet
964  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
965  */
966 static int
967 check_possible_route (void *cls, const struct GNUNET_HashCode * key, void *value)
968 {
969   struct DirectNeighbor *neighbor = cls;
970   struct Target *target = value;
971   struct Route *route;
972   
973   route = GNUNET_CONTAINER_multihashmap_get (all_routes,
974                                            key);
975   if (NULL != route)
976   {
977     if (route->target.distance > target->distance + 1)
978     {
979       /* this 'target' is cheaper than the existing route; switch to alternative route! */
980       move_route (route, target->distance + 1);
981       route->next_hop = neighbor;
982       // FIXME: notify plugin about distance update?
983     }
984     return GNUNET_YES; /* got a route to this target already */
985   }
986   route = GNUNET_malloc (sizeof (struct Route));
987   route->next_hop = neighbor;
988   route->target.distance = target->distance + 1;
989   route->target.peer = target->peer;
990   allocate_route (route, route->target.distance);
991   GNUNET_assert (GNUNET_YES ==
992                  GNUNET_CONTAINER_multihashmap_put (all_routes,
993                                                     &route->target.peer.hashPubKey,
994                                                     route,
995                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
996   send_connect_to_plugin (&route->target.peer, target->distance);
997   return GNUNET_YES;
998 }
999
1000
1001 /**
1002  * Multihashmap iterator for finding routes that were previously
1003  * "hidden" due to a better route (called after a disconnect event).
1004  *
1005  * @param cls NULL
1006  * @param key peer identity of the given direct neighbor
1007  * @param value a 'struct DirectNeighbor' to check for additional routes
1008  * @return GNUNET_YES to continue iteration
1009  */
1010 static int
1011 refresh_routes (void *cls, const struct GNUNET_HashCode * key, void *value)
1012 {
1013   struct DirectNeighbor *neighbor = value;
1014
1015   if (NULL != neighbor->neighbor_table)
1016     GNUNET_CONTAINER_multihashmap_iterate (neighbor->neighbor_table,
1017                                            &check_possible_route,
1018                                            neighbor);
1019   return GNUNET_YES;
1020 }
1021
1022
1023 /**
1024  * Cleanup all of the data structures associated with a given neighbor.
1025  *
1026  * @param neighbor neighbor to clean up
1027  */
1028 static void
1029 cleanup_neighbor (struct DirectNeighbor *neighbor)
1030 {
1031   struct PendingMessage *pending;
1032
1033   while (NULL != (pending = neighbor->pm_head))
1034   {
1035     neighbor->pm_queue_size--;
1036     GNUNET_CONTAINER_DLL_remove (neighbor->pm_head,
1037                                  neighbor->pm_tail,
1038                                  pending);    
1039     GNUNET_free (pending);
1040   }
1041   GNUNET_CONTAINER_multihashmap_iterate (all_routes,
1042                                          &cull_routes,
1043                                          neighbor);
1044   if (NULL != neighbor->cth)
1045   {
1046     GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth);
1047     neighbor->cth = NULL;
1048   }
1049   if (GNUNET_SCHEDULER_NO_TASK != neighbor->consensus_task)
1050   {
1051     GNUNET_SCHEDULER_cancel (neighbor->consensus_task);
1052     neighbor->consensus_task = GNUNET_SCHEDULER_NO_TASK;
1053   }
1054   if (NULL != neighbor->consensus)
1055   {
1056     GNUNET_CONSENSUS_destroy (neighbor->consensus);
1057     neighbor->consensus = NULL;
1058   }
1059   GNUNET_assert (GNUNET_YES ==
1060                  GNUNET_CONTAINER_multihashmap_remove (direct_neighbors, 
1061                                                        &neighbor->peer.hashPubKey,
1062                                                        neighbor));
1063   GNUNET_free (neighbor);
1064 }
1065
1066
1067 /**
1068  * Method called whenever a given peer disconnects.
1069  *
1070  * @param cls closure
1071  * @param peer peer identity this notification is about
1072  */
1073 static void
1074 handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
1075 {
1076   struct DirectNeighbor *neighbor;
1077
1078   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079               "Received core peer disconnect message for peer `%s'!\n",
1080               GNUNET_i2s (peer));
1081   /* Check for disconnect from self message */
1082   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
1083     return;
1084   neighbor =
1085       GNUNET_CONTAINER_multihashmap_get (direct_neighbors, &peer->hashPubKey);
1086   if (NULL == neighbor)
1087   {
1088     /* must have been a DV-neighbor, ignore */
1089     return;
1090   }
1091   cleanup_neighbor (neighbor);
1092   GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors,
1093                                          &refresh_routes,
1094                                          NULL);
1095 }
1096
1097
1098 /**
1099  * Multihashmap iterator for freeing routes.  Should never be called.
1100  *
1101  * @param cls NULL
1102  * @param key key value stored under
1103  * @param value the route to be freed
1104  *
1105  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
1106  */
1107 static int
1108 free_route (void *cls, const struct GNUNET_HashCode * key, void *value)
1109 {
1110   struct Route *route = value;
1111
1112   GNUNET_break (0);
1113   GNUNET_assert (GNUNET_YES ==
1114                  GNUNET_CONTAINER_multihashmap_remove (all_routes, key, value));
1115   release_route (route);
1116   send_disconnect_to_plugin (&route->target.peer);
1117   GNUNET_free (route);
1118   return GNUNET_YES;
1119 }
1120
1121
1122 /**
1123  * Multihashmap iterator for freeing direct neighbors. Should never be called.
1124  *
1125  * @param cls NULL
1126  * @param key key value stored under
1127  * @param value the direct neighbor to be freed
1128  *
1129  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
1130  */
1131 static int
1132 free_direct_neighbors (void *cls, const struct GNUNET_HashCode * key, void *value)
1133 {
1134   struct DirectNeighbor *neighbor = value;
1135
1136   GNUNET_break (0);
1137   cleanup_neighbor (neighbor);
1138   return GNUNET_YES;
1139 }
1140
1141
1142 /**
1143  * Task run during shutdown.
1144  *
1145  * @param cls unused
1146  * @param tc unused
1147  */
1148 static void
1149 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1150 {
1151   struct PendingMessage *pending;
1152   unsigned int i;
1153
1154   GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors,
1155                                          &free_direct_neighbors, NULL);
1156   GNUNET_CONTAINER_multihashmap_destroy (direct_neighbors);
1157   GNUNET_CONTAINER_multihashmap_iterate (all_routes,
1158                                          &free_route, NULL);
1159   GNUNET_CONTAINER_multihashmap_destroy (all_routes);
1160   GNUNET_CORE_disconnect (core_api);
1161   core_api = NULL;
1162   GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1163   stats = NULL;
1164   while (NULL != (pending = plugin_pending_head))
1165   {
1166     GNUNET_CONTAINER_DLL_remove (plugin_pending_head,
1167                                  plugin_pending_tail,
1168                                  pending);
1169     GNUNET_free (pending);
1170   }
1171   for (i=0;i<DEFAULT_FISHEYE_DEPTH - 1;i++)
1172     GNUNET_array_grow (consensi[i].targets,
1173                        consensi[i].array_length,
1174                        0);
1175 }
1176
1177
1178 /**
1179  * Handle START-message.  This is the first message sent to us
1180  * by the client (can only be one!).
1181  *
1182  * @param cls closure (always NULL)
1183  * @param client identification of the client
1184  * @param message the actual message
1185  */
1186 static void
1187 handle_start (void *cls, struct GNUNET_SERVER_Client *client,
1188               const struct GNUNET_MessageHeader *message)
1189 {
1190   if (NULL != client_handle)
1191   {
1192     /* forcefully drop old client */
1193     GNUNET_SERVER_client_disconnect (client_handle);
1194     GNUNET_SERVER_client_drop (client_handle);
1195   }
1196   client_handle = client;
1197   GNUNET_SERVER_client_keep (client_handle);
1198   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1199 }
1200
1201
1202 /**
1203  * Called on core init.
1204  *
1205  * @param cls unused
1206  * @param server legacy
1207  * @param identity this peer's identity
1208  */
1209 static void
1210 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1211            const struct GNUNET_PeerIdentity *identity)
1212 {
1213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214               "I am peer: %s\n",
1215               GNUNET_i2s (identity));
1216   my_identity = *identity;
1217 }
1218
1219
1220 /**
1221  * Process dv requests.
1222  *
1223  * @param cls closure
1224  * @param server the initialized server
1225  * @param c configuration to use
1226  */
1227 static void
1228 run (void *cls, struct GNUNET_SERVER_Handle *server,
1229      const struct GNUNET_CONFIGURATION_Handle *c)
1230 {
1231   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1232     {&handle_dv_route_message, GNUNET_MESSAGE_TYPE_DV_ROUTE, 0},
1233     {NULL, 0, 0}
1234   };
1235   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
1236     {&handle_start, NULL, 
1237      GNUNET_MESSAGE_TYPE_DV_START, 
1238      sizeof (struct GNUNET_MessageHeader) },
1239     { &handle_dv_send_message, NULL, 
1240       GNUNET_MESSAGE_TYPE_DV_SEND, 
1241       0},
1242     {NULL, NULL, 0, 0}
1243   };
1244
1245   cfg = c;
1246   direct_neighbors = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
1247   all_routes = GNUNET_CONTAINER_multihashmap_create (65536, GNUNET_NO);
1248   core_api = GNUNET_CORE_connect (cfg, NULL,
1249                                   &core_init, 
1250                                   &handle_core_connect,
1251                                   &handle_core_disconnect,
1252                                   NULL, GNUNET_NO, 
1253                                   NULL, GNUNET_NO, 
1254                                   core_handlers);
1255
1256   if (NULL == core_api)
1257     return;
1258   stats = GNUNET_STATISTICS_create ("dv", cfg);
1259   GNUNET_SERVER_add_handlers (server, plugin_handlers);
1260   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1261                                 &shutdown_task, NULL);
1262 }
1263
1264
1265 /**
1266  * The main function for the dv service.
1267  *
1268  * @param argc number of arguments from the command line
1269  * @param argv command line arguments
1270  * @return 0 ok, 1 on error
1271  */
1272 int
1273 main (int argc, char *const *argv)
1274 {
1275   return (GNUNET_OK ==
1276           GNUNET_SERVICE_run (argc, argv, "dv", GNUNET_SERVICE_OPTION_NONE,
1277                               &run, NULL)) ? 0 : 1;
1278 }
1279
1280 /* end of gnunet-service-dv.c */