fix starting stddevs/avgs
[oweals/gnunet.git] / src / nse / gnunet-service-nse.c
1 /*
2   This file is part of GNUnet.
3   (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4   
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9   
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14   
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file nse/gnunet-service-nse.c
23  * @brief network size estimation service
24  * @author Nathan Evans
25  * @author Christian Grothoff
26  *
27  * The purpose of this service is to estimate the size of the network.
28  * Given a specified interval, each peer hashes the most recent
29  * timestamp which is evenly divisible by that interval.  This hash is
30  * compared in distance to the peer identity to choose an offset.  The
31  * closer the peer identity to the hashed timestamp, the earlier the
32  * peer sends out a "nearest peer" message.  The closest peer's
33  * message should thus be received before any others, which stops
34  * those peer from sending their messages at a later duration.  So
35  * every peer should receive the same nearest peer message, and from
36  * this can calculate the expected number of peers in the network.
37  *
38  * TODO:
39  * - generate proof-of-work asynchronously, store it on disk & load it back
40  * - handle messages for future round (one into the future, see FIXME)
41  */
42 #include "platform.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet_constants.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_statistics_service.h"
48 #include "gnunet_core_service.h"
49 #include "gnunet_nse_service.h"
50 #include "nse.h"
51
52 /**
53  * Over how many values do we calculate the weighted average?
54  */
55 #define HISTORY_SIZE 8
56
57 /**
58  * Size of the queue to core.
59  */
60 #define CORE_QUEUE_SIZE 2
61
62 /**
63  * Message priority to use.
64  */
65 #define NSE_PRIORITY 5
66
67 /**
68  * Amount of work required (W-bit collisions) for NSE proofs, in collision-bits.
69  */
70 #define NSE_WORK_REQUIRED 0
71
72 /**
73  * Interval for sending network size estimation flood requests.
74  */
75 #define GNUNET_NSE_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
76
77
78 /**
79  * Per-peer information.
80  */
81 struct NSEPeerEntry
82 {
83
84   /**
85    * Pending message for this peer.
86    */
87   struct GNUNET_MessageHeader *pending_message;
88
89   /**
90    * Core handle for sending messages to this peer.
91    */
92   struct GNUNET_CORE_TransmitHandle *th;
93
94   /**
95    * What is the identity of the peer?
96    */
97   struct GNUNET_PeerIdentity id;
98
99   /**
100    * Task scheduled to send message to this peer.
101    */
102   GNUNET_SCHEDULER_TaskIdentifier transmit_task;
103
104   /**
105    * Did we receive or send a message about the previous round
106    * to this peer yet?  
107    */
108   int previous_round;
109 };
110
111
112 /**
113  * Network size estimate reply; sent when "this"
114  * peer's timer has run out before receiving a
115  * valid reply from another peer.
116  */
117 struct GNUNET_NSE_FloodMessage
118 {
119   /**
120    * Type: GNUNET_MESSAGE_TYPE_NSE_P2P_FLOOD
121    */
122   struct GNUNET_MessageHeader header;
123
124   /**
125    * Number of hops this message has taken so far.
126    */
127   uint32_t hop_count;
128
129   /**
130    * Purpose.
131    */
132   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
133
134   /**
135    * The current timestamp value (which all
136    * peers should agree on).
137    */
138   struct GNUNET_TIME_AbsoluteNBO timestamp;
139
140   /**
141    * Number of matching bits between the hash
142    * of timestamp and the initiator's public
143    * key.
144    */
145   uint32_t matching_bits;
146
147   /**
148    * Public key of the originator.
149    */
150   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
151
152   /**
153    * Proof of work, causing leading zeros when hashed with pkey.
154    */
155   uint64_t proof_of_work;
156
157   /**
158    * Signature (over range specified in purpose).
159    */
160   struct GNUNET_CRYPTO_RsaSignature signature;
161 };
162
163
164 /**
165  * Handle to our current configuration.
166  */
167 static const struct GNUNET_CONFIGURATION_Handle *cfg;
168
169 /**
170  * Handle to the statistics service.
171  */
172 static struct GNUNET_STATISTICS_Handle *stats;
173
174 /**
175  * Handle to the core service.
176  */
177 static struct GNUNET_CORE_Handle *coreAPI;
178
179 /**
180  * Map of all connected peers.
181  */
182 static struct GNUNET_CONTAINER_MultiHashMap *peers;
183
184 /**
185  * The current network size estimate.  Number of bits matching on
186  * average thus far. 
187  */
188 static double current_size_estimate;
189
190 /**
191  * The standard deviation of the last HISTORY_SIZE network
192  * size estimates.
193  */
194 static double current_std_dev = NAN;
195
196 /**
197  * Current hop counter estimate (estimate for network diameter).
198  */
199 static uint32_t hop_count_max;
200
201 /**
202  * Array of recent size estimate messages.
203  */
204 static struct GNUNET_NSE_FloodMessage size_estimate_messages[HISTORY_SIZE];
205
206 /**
207  * Index of most recent estimate.
208  */
209 static unsigned int estimate_index;
210
211 /**
212  * Number of valid entries in the history.
213  */
214 static unsigned int estimate_count;
215
216 /**
217  * Task scheduled to update our flood message for the next round.
218  */
219 static GNUNET_SCHEDULER_TaskIdentifier flood_task;
220
221 /**
222  * Notification context, simplifies client broadcasts.
223  */
224 static struct GNUNET_SERVER_NotificationContext *nc;
225
226 /**
227  * The next major time.
228  */
229 static struct GNUNET_TIME_Absolute next_timestamp;
230
231 /**
232  * The current major time.
233  */
234 static struct GNUNET_TIME_Absolute current_timestamp;
235
236 /**
237  * The public key of this peer.
238  */
239 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
240
241 /**
242  * The private key of this peer.
243  */
244 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
245
246 /**
247  * The peer identity of this peer.
248  */
249 static struct GNUNET_PeerIdentity my_identity;
250
251 /**
252  * Proof of work for this peer.
253  */
254 static uint64_t my_proof;
255
256
257 /**
258  * Initialize a message to clients with the current network
259  * size estimate.
260  *
261  * @param em message to fill in
262  */
263 static void
264 setup_estimate_message (struct GNUNET_NSE_ClientMessage *em)
265 {
266   unsigned int i;
267   double mean;
268   double sum;
269   double std_dev;
270   double variance;
271   double val;
272   double weight;
273   double sumweight;
274   double q;
275   double r;
276   double temp;
277
278   /* Weighted incremental algorithm for stddev according to West (1979) */
279   mean = 0.0;
280   sum = 0.0;
281   sumweight = 0.0;
282   for (i=0;i<estimate_count; i++)
283     {
284       val = htonl (size_estimate_messages[(estimate_index - i + HISTORY_SIZE) % HISTORY_SIZE].matching_bits);
285       weight = estimate_count + 1 - i;
286
287       temp = weight + sumweight;
288       q = val - mean;
289       r = q * weight / temp;
290       sum += sumweight * q * r;
291       mean += r;
292       sumweight = temp;
293     }
294   variance = sum / (sumweight - 1.0);
295   GNUNET_assert (variance >= 0);
296   std_dev = sqrt (variance);
297   current_std_dev = std_dev;
298   current_size_estimate = mean;
299
300   em->header.size
301     = htons (sizeof(struct GNUNET_NSE_ClientMessage));
302   em->header.type
303     = htons (GNUNET_MESSAGE_TYPE_NSE_ESTIMATE);
304   em->reserved = htonl (0);
305   em->size_estimate = mean - 0.5;
306   em->std_deviation = std_dev;
307   GNUNET_STATISTICS_set (stats, 
308                          "# nodes in the network (estimate)",
309                          (uint64_t) pow (2, mean - 0.5), GNUNET_NO);
310 }
311
312
313 /**
314  * Handler for START message from client, triggers an
315  * immediate current network estimate notification.
316  * Also, we remember the client for updates upon future
317  * estimate measurements.
318  *
319  * @param cls unused
320  * @param client who sent the message
321  * @param message the message received
322  */
323 static void
324 handle_start_message(void *cls, struct GNUNET_SERVER_Client *client,
325                      const struct GNUNET_MessageHeader *message)
326 {
327   struct GNUNET_NSE_ClientMessage em;
328 #if DEBUG_NSE
329   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 
330              "Received START message from client\n");
331 #endif
332   GNUNET_SERVER_notification_context_add (nc, client);
333   setup_estimate_message (&em);
334   GNUNET_SERVER_notification_context_unicast (nc, client, &em.header, GNUNET_YES);
335   GNUNET_SERVER_receive_done (client, GNUNET_OK);
336 }
337
338
339 /**
340  * How long should we delay a message to go the given number of
341  * matching bits?
342  *
343  * @param matching_bits number of matching bits to consider
344  */
345 static double
346 get_matching_bits_delay (uint32_t matching_bits)
347 {
348   /* Calculated as: S + f/2 - (f / pi) * (atan(x - p'))*/  
349   // S is next_timestamp
350   // f is frequency (GNUNET_NSE_INTERVAL)
351   // x is matching_bits
352   // p' is current_size_estimate
353   return ((double) GNUNET_NSE_INTERVAL.rel_value / (double) 2)
354     - ((GNUNET_NSE_INTERVAL.rel_value / M_PI) * atan (matching_bits - current_size_estimate));
355 }
356
357
358 /**
359  * What delay randomization should we apply for a given number of matching bits?
360  *
361  * @param matching_bits number of matching bits
362  * @return random delay to apply 
363  */
364 static struct GNUNET_TIME_Relative 
365 get_delay_randomization (uint32_t matching_bits)
366 {
367   struct GNUNET_TIME_Relative ret;
368
369   if (matching_bits == 0)
370     return GNUNET_TIME_UNIT_ZERO;
371   ret.rel_value = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
372                                             (uint32_t) (get_matching_bits_delay (matching_bits - 1) / (double) (hop_count_max + 1)));
373   return ret;
374 }
375
376
377 /**
378  * Get the number of matching bits that the given timestamp has to the given peer ID.
379  *
380  * @param timestamp time to generate key
381  * @param id peer identity to compare with
382  * @return number of matching bits
383  */
384 static uint32_t
385 get_matching_bits (struct GNUNET_TIME_Absolute timestamp,
386                    const struct GNUNET_PeerIdentity *id)
387 {
388   GNUNET_HashCode timestamp_hash;
389
390   GNUNET_CRYPTO_hash (&timestamp.abs_value,
391                       sizeof(timestamp.abs_value), 
392                       &timestamp_hash);
393   return GNUNET_CRYPTO_hash_matching_bits (&timestamp_hash,
394                                            &id->hashPubKey);
395 }
396
397
398 /**
399  * Get the transmission delay that should be applied for a 
400  * particular round.
401  *
402  * @param round_offset -1 for the previous round (random delay between 0 and 50ms)
403  *                      0 for the current round (based on our proximity to time key)
404  * @return delay that should be applied
405  */
406 static struct GNUNET_TIME_Relative
407 get_transmit_delay (int round_offset)
408 {
409   struct GNUNET_TIME_Relative ret;
410   struct GNUNET_TIME_Absolute tgt;
411   double dist_delay;
412   uint32_t matching_bits;
413
414   switch (round_offset)
415     {
416     case -1:
417       /* previous round is randomized between 0 and 50 ms */
418       ret.rel_value = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
419                                                 50);
420       return ret;
421     case 0:
422       /* current round is based on best-known matching_bits */
423       matching_bits = ntohl (size_estimate_messages[estimate_index].matching_bits);
424       dist_delay = get_matching_bits_delay (matching_bits);
425       dist_delay += get_delay_randomization (matching_bits).rel_value;
426       ret.rel_value = (uint64_t) dist_delay;
427       /* now consider round start time and add delay to it */
428       tgt = GNUNET_TIME_absolute_add (current_timestamp, ret);
429       return GNUNET_TIME_absolute_get_remaining (tgt);
430     }
431   GNUNET_break (0);
432   return GNUNET_TIME_UNIT_FOREVER_REL;
433 }
434
435
436 /**
437  * Task that triggers a NSE P2P transmission.
438  *
439  * @param cls the 'struct NSEPeerEntry'
440  * @param tc scheduler context
441  */
442 static void
443 transmit_task (void *cls,
444                const struct GNUNET_SCHEDULER_TaskContext *tc);
445
446
447 /**
448  * Called when core is ready to send a message we asked for
449  * out to the destination.
450  *
451  * @param cls closure (NULL)
452  * @param size number of bytes available in buf
453  * @param buf where the callee should write the message
454  * @return number of bytes written to buf
455  */
456 static size_t
457 transmit_ready (void *cls, size_t size, void *buf)
458 {
459   struct NSEPeerEntry *peer_entry = cls;
460   unsigned int idx;
461
462   peer_entry->th = NULL;
463   if (buf == NULL)
464     {
465       /* client disconnected */
466       return 0;
467     }
468   GNUNET_assert (size >= sizeof (struct GNUNET_NSE_FloodMessage));
469 #if DEBUG_NSE > 1
470   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
471               "Sending size estimate to `%s'\n",
472               GNUNET_i2s (&peer_entry->id));
473 #endif
474   idx = estimate_index;
475   if (peer_entry->previous_round == GNUNET_YES)
476     {
477       idx = (idx + HISTORY_SIZE -1) % HISTORY_SIZE;
478       peer_entry->previous_round = GNUNET_NO;
479       peer_entry->transmit_task = GNUNET_SCHEDULER_add_delayed (get_transmit_delay (0),
480                                                                 &transmit_task,
481                                                                 peer_entry);
482     }
483   memcpy (buf,
484           &size_estimate_messages[idx],
485           sizeof (struct GNUNET_NSE_FloodMessage));
486   GNUNET_STATISTICS_update (stats, 
487                             "# flood messages sent", 
488                             1, 
489                             GNUNET_NO);
490   return sizeof (struct GNUNET_NSE_FloodMessage);
491 }
492
493
494 /**
495  * Task that triggers a NSE P2P transmission.
496  *
497  * @param cls the 'struct NSEPeerEntry'
498  * @param tc scheduler context
499  */
500 static void
501 transmit_task (void *cls,
502                const struct GNUNET_SCHEDULER_TaskContext *tc)
503 {
504   struct NSEPeerEntry *peer_entry = cls;
505  
506   peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
507   peer_entry->th
508     = GNUNET_CORE_notify_transmit_ready (coreAPI,
509                                          GNUNET_NO,
510                                          NSE_PRIORITY,
511                                          GNUNET_TIME_UNIT_FOREVER_REL,
512                                          &peer_entry->id,
513                                          sizeof (struct GNUNET_NSE_FloodMessage),
514                                          &transmit_ready, peer_entry);
515 }
516
517
518 /**
519  * We've sent on our flood message or one that we received which was
520  * validated and closer than ours.  Update the global list of recent
521  * messages and the average.  Also re-broadcast the message to any
522  * clients.
523  */
524 static void
525 update_network_size_estimate ()
526 {
527   struct GNUNET_NSE_ClientMessage em;
528
529   setup_estimate_message (&em);
530   GNUNET_SERVER_notification_context_broadcast (nc,
531                                                 &em.header,
532                                                 GNUNET_YES);    
533 }
534
535
536 /**
537  * Setup a flood message in our history array at the given
538  * slot offset for the given timestamp.
539  *
540  * @param slot index to use
541  * @param ts timestamp to use
542  */
543 static void
544 setup_flood_message (unsigned int slot,
545                      struct GNUNET_TIME_Absolute ts)
546 {
547   struct GNUNET_NSE_FloodMessage *fm;
548   uint32_t matching_bits;
549
550   matching_bits = get_matching_bits (ts, &my_identity);
551   fm = &size_estimate_messages[slot];
552   fm->header.size = htons (sizeof(struct GNUNET_NSE_FloodMessage));
553   fm->header.type = htons (GNUNET_MESSAGE_TYPE_NSE_P2P_FLOOD);
554   fm->hop_count = htonl (0);
555   fm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_NSE_SEND);
556   fm->purpose.size = htonl (sizeof(struct GNUNET_NSE_FloodMessage)
557                             - sizeof (struct GNUNET_MessageHeader) 
558                             - sizeof (uint32_t)
559                             - sizeof (struct GNUNET_CRYPTO_RsaSignature));
560   fm->matching_bits = htonl (matching_bits);
561   fm->timestamp = GNUNET_TIME_absolute_hton (ts);
562   fm->pkey = my_public_key;
563   fm->proof_of_work = my_proof;
564   GNUNET_CRYPTO_rsa_sign (my_private_key, 
565                           &fm->purpose,
566                           &fm->signature);
567 }
568
569
570 /**
571  * Schedule transmission for the given peer for the current round based
572  * on what we know about the desired delay.
573  *
574  * @param cls unused
575  * @param key hash of peer identity
576  * @param value the 'struct NSEPeerEntry'
577  * @return GNUNET_OK (continue to iterate)
578  */
579 static int
580 schedule_current_round (void *cls,
581                         const GNUNET_HashCode *key,
582                         void *value)
583 {
584   struct NSEPeerEntry *peer_entry = value;
585   struct GNUNET_TIME_Relative delay;
586
587   if (peer_entry->th != NULL)
588     {
589       peer_entry->previous_round = GNUNET_NO;
590       return GNUNET_OK;
591     }
592   if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
593     {
594       GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
595       peer_entry->previous_round = GNUNET_NO;
596     }
597   delay = get_transmit_delay ((peer_entry->previous_round == GNUNET_NO) ? -1 : 0);
598   peer_entry->transmit_task = GNUNET_SCHEDULER_add_delayed (delay,
599                                                             &transmit_task,
600                                                             peer_entry);
601   return GNUNET_OK;
602 }
603
604
605 /**
606  * Update our flood message to be sent (and our timestamps).
607  *
608  * @param cls unused
609  * @param tc context for this message
610  */
611 static void
612 update_flood_message(void *cls,
613                      const struct GNUNET_SCHEDULER_TaskContext *tc)
614 {
615   struct GNUNET_TIME_Relative offset;
616   unsigned int i;
617
618   flood_task = GNUNET_SCHEDULER_NO_TASK;
619   offset = GNUNET_TIME_absolute_get_remaining (next_timestamp);
620   if (0 != offset.rel_value)
621     {
622       /* somehow run early, delay more */
623       flood_task
624         = GNUNET_SCHEDULER_add_delayed (offset,
625                                         &update_flood_message, 
626                                         NULL);
627       return;
628     }
629   current_timestamp = next_timestamp;
630   next_timestamp = GNUNET_TIME_absolute_add (current_timestamp,
631                                              GNUNET_NSE_INTERVAL);
632   estimate_index = (estimate_index + 1) % HISTORY_SIZE;
633   if (estimate_count < HISTORY_SIZE)
634     estimate_count++;
635   setup_flood_message (estimate_index, current_timestamp);
636   hop_count_max = 0;
637   for (i=0;i<HISTORY_SIZE;i++)
638     hop_count_max = GNUNET_MAX (ntohl (size_estimate_messages[i].hop_count),
639                                 hop_count_max);
640   GNUNET_CONTAINER_multihashmap_iterate (peers,
641                                          &schedule_current_round,
642                                          NULL);
643   flood_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (next_timestamp), 
644                                              &update_flood_message, NULL);
645 }
646
647
648 /**
649  * Count the leading zeroes in hash.
650  *
651  * @param hash
652  * @return the number of leading zero bits.
653  */
654 static unsigned int 
655 count_leading_zeroes(const GNUNET_HashCode *hash)
656 {
657   unsigned int hash_count;
658   
659   hash_count = 0;
660   while ((0 == GNUNET_CRYPTO_hash_get_bit(hash, hash_count)))
661     hash_count++;
662   return hash_count;
663 }
664
665
666 /**
667  * Check whether the given public key
668  * and integer are a valid proof of work.
669  *
670  * @param pkey the public key
671  * @param val the integer
672  *
673  * @return GNUNET_YES if valid, GNUNET_NO if not
674  */
675 static int
676 check_proof_of_work(const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey,
677                     uint64_t val)
678 {  
679   char buf[sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + sizeof(val)];
680   GNUNET_HashCode result;
681   
682   memcpy (buf,
683           &val,
684           sizeof (val));
685   memcpy (&buf[sizeof(val)],
686           pkey, 
687           sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
688   GNUNET_CRYPTO_hash (buf, sizeof (buf), &result);
689   return (count_leading_zeroes (&result) >= NSE_WORK_REQUIRED) ? GNUNET_YES : GNUNET_NO;
690 }
691
692
693 /**
694  * Given a public key, find an integer such that the hash of the key
695  * concatenated with the integer has NSE_WORK_REQUIRED leading 0
696  * bits.  FIXME: this is a synchronous function... bad
697  *
698  * @param pkey the public key
699  * @return 64 bit number that satisfies the requirements
700  */
701 static uint64_t 
702 find_proof_of_work(const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey)
703 {
704   uint64_t counter;
705   char buf[sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + sizeof(uint64_t)];
706   GNUNET_HashCode result;
707   
708   memcpy (&buf[sizeof(uint64_t)],
709           pkey, 
710           sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
711   counter = 0;
712   while (counter != UINT64_MAX)
713     {
714       memcpy (buf,
715               &counter, 
716               sizeof(uint64_t));
717       GNUNET_CRYPTO_hash (buf, sizeof (buf), &result);
718       if (NSE_WORK_REQUIRED <= count_leading_zeroes(&result))
719         break;
720       counter++;
721     }
722   return counter;
723 }
724
725
726 /**
727  * An incoming flood message has been received which claims
728  * to have more bits matching than any we know in this time
729  * period.  Verify the signature and/or proof of work.
730  *
731  * @param incoming_flood the message to verify
732  *
733  * @return GNUNET_YES if the message is verified
734  *         GNUNET_NO if the key/signature don't verify
735  */
736 static int 
737 verify_message_crypto(const struct GNUNET_NSE_FloodMessage *incoming_flood)
738 {
739   if (GNUNET_YES !=
740       check_proof_of_work (&incoming_flood->pkey,
741                            incoming_flood->proof_of_work))
742     {
743       GNUNET_break_op (0);
744       return GNUNET_NO;
745     }
746   if (GNUNET_OK != 
747       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_NSE_SEND,
748                                 &incoming_flood->purpose,
749                                 &incoming_flood->signature,
750                                 &incoming_flood->pkey))
751     {
752       GNUNET_break_op (0);
753       return GNUNET_NO;
754     }
755   return GNUNET_YES;
756 }
757
758
759 /**
760  * Update transmissions for the given peer for the current round based
761  * on updated proximity information.
762  *
763  * @param cls peer entry to exclude from updates
764  * @param key hash of peer identity
765  * @param value the 'struct NSEPeerEntry'
766  * @return GNUNET_OK (continue to iterate)
767  */
768 static int
769 update_flood_times (void *cls,
770                     const GNUNET_HashCode *key,
771                     void *value)
772 {
773   struct NSEPeerEntry *exclude = cls;
774   struct NSEPeerEntry *peer_entry = value;
775   struct GNUNET_TIME_Relative delay;
776
777   if (peer_entry->th != NULL)
778     return GNUNET_OK; /* already active */
779   if (peer_entry == exclude)
780     return GNUNET_OK; /* trigger of the update */
781   if (peer_entry->previous_round == GNUNET_YES)
782     {
783       /* still stuck in previous round, no point to update, check that 
784          we are active here though... */
785       GNUNET_break (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK);
786       return GNUNET_OK; 
787     }
788   if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
789     {
790       GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
791       peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
792     }
793   delay = get_transmit_delay (0);
794   peer_entry->transmit_task = GNUNET_SCHEDULER_add_delayed (delay,
795                                                             &transmit_task,
796                                                             peer_entry);
797   return GNUNET_OK;
798 }
799
800
801 /**
802  * Core handler for size estimate flooding messages.
803  *
804  * @param cls closure unused
805  * @param message message
806  * @param peer peer identity this message is from (ignored)
807  * @param atsi performance data (ignored)
808  *
809  */
810 static int
811 handle_p2p_size_estimate(void *cls, 
812                          const struct GNUNET_PeerIdentity *peer,
813                          const struct GNUNET_MessageHeader *message,
814                          const struct GNUNET_TRANSPORT_ATS_Information *atsi)
815 {
816   const struct GNUNET_NSE_FloodMessage *incoming_flood;
817   struct GNUNET_TIME_Absolute ts;
818   struct NSEPeerEntry *peer_entry;
819   uint32_t matching_bits;  
820   unsigned int idx;
821
822   incoming_flood = (const struct GNUNET_NSE_FloodMessage *) message;
823   GNUNET_STATISTICS_update (stats, 
824                             "# flood messages received", 
825                             1,
826                             GNUNET_NO);
827   matching_bits = ntohl (incoming_flood->matching_bits);
828   peer_entry = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey);
829   if (NULL == peer_entry)
830     {
831       GNUNET_break (0);
832       return GNUNET_OK;
833     }
834   ts = GNUNET_TIME_absolute_ntoh (incoming_flood->timestamp);
835   if (ts.abs_value == current_timestamp.abs_value)
836     idx = estimate_index;
837   else if (ts.abs_value == current_timestamp.abs_value - GNUNET_NSE_INTERVAL.rel_value)
838     idx = (estimate_index + HISTORY_SIZE - 1) % HISTORY_SIZE;
839   else if (ts.abs_value == next_timestamp.abs_value - GNUNET_NSE_INTERVAL.rel_value)
840     {
841       if (GNUNET_YES !=
842           verify_message_crypto (incoming_flood))
843         {
844           GNUNET_break_op (0);
845           return GNUNET_OK;
846         }
847       /* FIXME: keep in special 'future' buffer until next round starts for us! */
848       GNUNET_break (0); /* not implemented */
849       return GNUNET_OK;
850     }
851   else
852     {
853       GNUNET_STATISTICS_update (stats,
854                                 "# flood messages discarded (clock skew too large)",
855                                 1,
856                                 GNUNET_NO);
857       GNUNET_break_op (0);
858       return GNUNET_OK;
859     }
860   if (0 == (memcmp (peer, &my_identity, sizeof(struct GNUNET_PeerIdentity))))
861     {
862       /* send to self, update our own estimate IF this also comes from us! */
863       if (0 == memcmp (&incoming_flood->pkey,
864                        &my_public_key,
865                        sizeof (my_public_key)))                
866         update_network_size_estimate ();
867       return GNUNET_OK; 
868     }
869   if (matching_bits >= ntohl (size_estimate_messages[idx].matching_bits))        
870     {      
871       /* cancel transmission from us to this peer for this round */
872       if (idx == estimate_index)
873         {
874           if (peer_entry->previous_round == GNUNET_NO)
875             {
876               /* cancel any activity for current round */
877               if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
878                 {
879                   GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
880                   peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
881                   peer_entry->previous_round = GNUNET_NO;
882                 }
883               if (peer_entry->th != NULL)
884                 {
885                   GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th);
886                   peer_entry->th = NULL;
887                 }
888             }
889         }
890       else
891         {
892           /* cancel previous round only */
893           peer_entry->previous_round = GNUNET_NO;
894         }
895  
896     }
897   if (matching_bits <= ntohl (size_estimate_messages[idx].matching_bits)) 
898     {
899       /* Not closer than our most recent message, no need to do work here */
900       GNUNET_STATISTICS_update (stats,
901                                 "# flood messages ignored (had closer already)",
902                                 1,
903                                 GNUNET_NO);
904       return GNUNET_OK;
905     }
906   if (GNUNET_YES !=
907       verify_message_crypto (incoming_flood))
908     {
909       GNUNET_break_op (0);
910       return GNUNET_OK;
911     }
912   size_estimate_messages[idx] = *incoming_flood;
913   size_estimate_messages[idx].hop_count = htonl (ntohl (incoming_flood->hop_count) + 1);
914   hop_count_max = GNUNET_MAX (ntohl (incoming_flood->hop_count) + 1,
915                               hop_count_max);
916
917   /* have a new, better size estimate, inform clients */
918   update_network_size_estimate ();
919
920   /* flood to rest */
921   GNUNET_CONTAINER_multihashmap_iterate (peers,
922                                          &update_flood_times,
923                                          peer_entry);
924   return GNUNET_OK;
925 }
926
927
928
929 /**
930  * Method called whenever a peer connects.
931  *
932  * @param cls closure
933  * @param peer peer identity this notification is about
934  * @param atsi performance data
935  */
936 static void
937 handle_core_connect(void *cls, const struct GNUNET_PeerIdentity *peer,
938                     const struct GNUNET_TRANSPORT_ATS_Information *atsi)
939 {
940   struct NSEPeerEntry *peer_entry;
941
942   peer_entry = GNUNET_malloc(sizeof(struct NSEPeerEntry));
943   peer_entry->id = *peer;
944   GNUNET_CONTAINER_multihashmap_put (peers,
945                                      &peer->hashPubKey,
946                                      peer_entry,
947                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
948   peer_entry->transmit_task = GNUNET_SCHEDULER_add_delayed (get_transmit_delay (-1),
949                                                             &transmit_task,
950                                                             peer_entry);
951 }
952
953
954 /**
955  * Method called whenever a peer disconnects.
956  *
957  * @param cls closure
958  * @param peer peer identity this notification is about
959  */
960 static void
961 handle_core_disconnect(void *cls, const struct GNUNET_PeerIdentity *peer)
962 {
963   struct NSEPeerEntry *pos;
964
965   pos = GNUNET_CONTAINER_multihashmap_get (peers,
966                                            &peer->hashPubKey);
967   if (NULL == pos)
968     {
969       GNUNET_break (0);
970       return;
971     }
972   GNUNET_assert (GNUNET_YES ==
973                  GNUNET_CONTAINER_multihashmap_remove (peers, 
974                                                        &peer->hashPubKey,
975                                                        pos));
976   if (pos->transmit_task != GNUNET_SCHEDULER_NO_TASK)
977     GNUNET_SCHEDULER_cancel (pos->transmit_task);
978   if (pos->th != NULL)
979     GNUNET_CORE_notify_transmit_ready_cancel (pos->th);
980   GNUNET_free(pos);
981 }
982
983
984 /**
985  * Task run during shutdown.
986  *
987  * @param cls unused
988  * @param tc unused
989  */
990 static void
991 shutdown_task(void *cls,
992               const struct GNUNET_SCHEDULER_TaskContext *tc)
993 {
994   if (flood_task != GNUNET_SCHEDULER_NO_TASK)
995     {
996       GNUNET_SCHEDULER_cancel (flood_task);
997       flood_task = GNUNET_SCHEDULER_NO_TASK;
998     }
999   if (nc != NULL)
1000     {
1001       GNUNET_SERVER_notification_context_destroy (nc);
1002       nc = NULL;
1003     }
1004   if (coreAPI != NULL)
1005     {
1006       GNUNET_CORE_disconnect (coreAPI);
1007       coreAPI = NULL;
1008     }
1009   if (stats != NULL)
1010     {
1011       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1012       stats = NULL;
1013     }
1014   if (peers != NULL)
1015     {
1016       GNUNET_CONTAINER_multihashmap_destroy (peers);
1017       peers = NULL;
1018     }
1019 }
1020
1021
1022 /**
1023  * Called on core init/fail.
1024  *
1025  * @param cls service closure
1026  * @param server handle to the server for this service
1027  * @param identity the public identity of this peer
1028  * @param publicKey the public key of this peer
1029  */
1030 static void
1031 core_init (void *cls, struct GNUNET_CORE_Handle *server,
1032            const struct GNUNET_PeerIdentity *identity,
1033            const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
1034 {
1035   struct GNUNET_TIME_Absolute now;
1036   struct GNUNET_TIME_Absolute prev_time;
1037   unsigned int i;
1038
1039   if (server == NULL)
1040     {
1041 #if DEBUG_NSE
1042       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1043                   "Connection to core FAILED!\n");
1044 #endif
1045       GNUNET_SCHEDULER_shutdown ();
1046       return;
1047     }
1048   my_identity = *identity;
1049   my_public_key = *publicKey;
1050
1051   now = GNUNET_TIME_absolute_get ();
1052   current_timestamp.abs_value = (now.abs_value / GNUNET_NSE_INTERVAL.rel_value) * GNUNET_NSE_INTERVAL.rel_value;
1053   next_timestamp.abs_value = current_timestamp.abs_value + GNUNET_NSE_INTERVAL.rel_value;
1054   
1055   for (i=0;i<HISTORY_SIZE;i++)
1056     {
1057       prev_time.abs_value = current_timestamp.abs_value - (HISTORY_SIZE - i - 1) * GNUNET_NSE_INTERVAL.rel_value;
1058       setup_flood_message (i, prev_time);
1059     }
1060   estimate_index = HISTORY_SIZE - 1;
1061   estimate_count = 2;
1062   flood_task
1063     = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (next_timestamp),
1064                                     &update_flood_message, NULL);
1065   my_proof = find_proof_of_work (&my_public_key);
1066 }
1067
1068
1069 /**
1070  * Handle network size estimate clients.
1071  *
1072  * @param cls closure
1073  * @param server the initialized server
1074  * @param c configuration to use
1075  */
1076 static void
1077 run(void *cls, struct GNUNET_SERVER_Handle *server,
1078     const struct GNUNET_CONFIGURATION_Handle *c)
1079 {
1080   char *keyfile;
1081
1082   static const struct GNUNET_SERVER_MessageHandler handlers[] =
1083     {
1084       { &handle_start_message, NULL, GNUNET_MESSAGE_TYPE_NSE_START, sizeof (struct GNUNET_MessageHeader) },
1085       { NULL, NULL, 0, 0 } 
1086     };
1087   static const struct GNUNET_CORE_MessageHandler core_handlers[] =
1088     {
1089       { &handle_p2p_size_estimate, GNUNET_MESSAGE_TYPE_NSE_P2P_FLOOD, sizeof (struct GNUNET_NSE_FloodMessage) },
1090       { NULL, 0, 0 } 
1091     };
1092   cfg = c;
1093   if (GNUNET_OK != 
1094       GNUNET_CONFIGURATION_get_value_filename (cfg,
1095                                                "GNUNETD", "HOSTKEY",
1096                                                &keyfile))
1097     {
1098       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
1099                   _ ("NSE service is lacking key configuration settings.  Exiting.\n"));
1100       GNUNET_SCHEDULER_shutdown ();
1101       return;
1102     }
1103   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
1104   GNUNET_free (keyfile);
1105   if (my_private_key == NULL)
1106     {
1107       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1108                   _("NSE service could not access hostkey.  Exiting.\n"));
1109       GNUNET_SCHEDULER_shutdown ();
1110       return;
1111     }
1112   peers = GNUNET_CONTAINER_multihashmap_create (128);
1113   GNUNET_SERVER_add_handlers (server, handlers);
1114   nc = GNUNET_SERVER_notification_context_create (server, 1);
1115   /* Connect to core service and register core handlers */
1116   coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */
1117                                  CORE_QUEUE_SIZE, /* queue size */
1118                                  NULL, /* Closure passed to functions */
1119                                  &core_init, /* Call core_init once connected */
1120                                  &handle_core_connect, /* Handle connects */
1121                                  &handle_core_disconnect, /* Handle disconnects */
1122                                  NULL, /* Do we care about "status" updates? */
1123                                  NULL, /* Don't want notified about all incoming messages */
1124                                  GNUNET_NO, /* For header only inbound notification */
1125                                  NULL, /* Don't want notified about all outbound messages */
1126                                  GNUNET_NO, /* For header only outbound notification */
1127                                  core_handlers); /* Register these handlers */
1128   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1129                                 &shutdown_task, NULL);
1130   if (coreAPI == NULL)
1131     {
1132       GNUNET_SCHEDULER_shutdown ();
1133       return;
1134     }
1135   stats = GNUNET_STATISTICS_create ("nse", cfg);
1136 }
1137
1138
1139 /**
1140  * The main function for the statistics service.
1141  *
1142  * @param argc number of arguments from the command line
1143  * @param argv command line arguments
1144  * @return 0 ok, 1 on error
1145  */
1146 int
1147 main(int argc, char * const *argv)
1148 {
1149   return (GNUNET_OK == GNUNET_SERVICE_run (argc, argv, 
1150                                            "nse",
1151                                            GNUNET_SERVICE_OPTION_NONE, &run,
1152                                            NULL)) ? 0 : 1;
1153 }
1154
1155 /* end of gnunet-service-nse.c */
1156