indentation fixes
[oweals/gnunet.git] / src / dht / gnunet_dht_profiler.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet_dht_profiler.c
23  * @brief Profiler for GNUnet DHT
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in>
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_testbed_service.h"
30 #include "gnunet_dht_service.h"
31
32 #define INFO(...)                                       \
33   GNUNET_log (GNUNET_ERROR_TYPE_INFO, __VA_ARGS__)
34
35 #define DEBUG(...)                                           \
36   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
37
38 /**
39  * Number of peers which should perform a PUT out of 100 peers
40  */
41 #define PUT_PROBABILITY 50
42
43 /**
44  * Configuration
45  */
46 static struct GNUNET_CONFIGURATION_Handle *cfg;
47
48 /**
49  * Name of the file with the hosts to run the test over
50  */
51 static char *hosts_file;
52
53 /**
54  * Context for a peer which actively does DHT PUT/GET
55  */
56 struct ActiveContext;
57
58 /**
59  * Context to hold data of peer
60  */
61 struct Context
62 {
63   /**
64    * The testbed peer this context belongs to
65    */
66   struct GNUNET_TESTBED_Peer *peer;
67
68   /**
69    * Testbed operation acting on this peer
70    */
71   struct GNUNET_TESTBED_Operation *op;
72
73   /**
74    * Active context; NULL if this peer is not an active peer
75    */
76   struct ActiveContext *ac;
77
78 };
79
80
81 /**
82  * Context for a peer which actively does DHT PUT/GET
83  */
84 struct ActiveContext
85 {
86   /**
87    * The linked peer context
88    */
89   struct Context *ctx;
90
91   /**
92    * Handler to the DHT service
93    */
94   struct GNUNET_DHT_Handle *dht;
95
96   /**
97    * The data used for do a PUT.  Will be NULL if a PUT hasn't been performed yet
98    */
99   void *put_data;
100
101   /**
102    * The active context used for our DHT GET
103    */
104   struct ActiveContext *get_ac;
105
106   /**
107    * The put handle
108    */
109   struct GNUNET_DHT_PutHandle *dht_put;
110
111   /**
112    * The get handle
113    */
114   struct GNUNET_DHT_GetHandle *dht_get;
115
116   /**
117    * The hash of the @e put_data
118    */
119   struct GNUNET_HashCode hash;
120
121   /**
122    * Delay task
123    */
124   struct GNUNET_SCHEDULER_Task * delay_task;
125
126   /**
127    * The size of the @e put_data
128    */
129   uint16_t put_data_size;
130
131   /**
132    * The number of peers currently doing GET on our data
133    */
134   uint16_t nrefs;
135 };
136
137
138 /**
139  * An array of contexts.  The size of this array should be equal to @a num_peers
140  */
141 static struct Context *a_ctx;
142
143 /**
144  * Array of active peers
145  */
146 static struct ActiveContext *a_ac;
147
148 /**
149  * The delay between rounds for collecting statistics
150  */
151 static struct GNUNET_TIME_Relative delay_stats;
152
153 /**
154  * The delay to start puts.
155  */
156 static struct GNUNET_TIME_Relative delay_put;
157
158 /**
159  * The delay to start puts.
160  */
161 static struct GNUNET_TIME_Relative delay_get;
162
163 /**
164  * The timeout for GET and PUT
165  */
166 static struct GNUNET_TIME_Relative timeout;
167
168 /**
169  * Number of peers
170  */
171 static unsigned int num_peers;
172
173 /**
174  * Number of active peers
175  */
176 static unsigned int n_active;
177
178 /**
179  * Number of DHT service connections we currently have
180  */
181 static unsigned int n_dht;
182
183 /**
184  * Number of DHT PUTs made
185  */
186 static unsigned int n_puts;
187
188 /**
189  * Number of DHT PUTs succeeded
190  */
191 static unsigned int n_puts_ok;
192
193 /**
194  * Number of DHT PUTs failed
195  */
196 static unsigned int n_puts_fail;
197
198 /**
199  * Number of DHT GETs made
200  */
201 static unsigned int n_gets;
202
203 /**
204  * Number of DHT GETs succeeded
205  */
206 static unsigned int n_gets_ok;
207
208 /**
209  * Number of DHT GETs succeeded
210  */
211 static unsigned int n_gets_fail;
212
213 /**
214  * Replication degree
215  */
216 static unsigned int replication;
217
218 /**
219  * Number of times we try to find the successor circle formation
220  */
221 static unsigned int max_searches;
222
223 /**
224  * Testbed Operation (to get stats).
225  */
226 static struct GNUNET_TESTBED_Operation *bandwidth_stats_op;
227
228 /**
229  * To get successor stats.
230  */
231 static struct GNUNET_TESTBED_Operation *successor_stats_op;
232
233 /**
234  * Testbed peer handles.
235  */
236 static struct GNUNET_TESTBED_Peer **testbed_handles;
237
238 /**
239  * Total number of messages sent by peer.
240  */
241 static uint64_t outgoing_bandwidth;
242
243 /**
244  * Total number of messages received by peer.
245  */
246 static uint64_t incoming_bandwidth;
247
248 /**
249  * Average number of hops taken to do put.
250  */
251 static double average_put_path_length;
252
253 /**
254  * Average number of hops taken to do get.
255  */
256 static double average_get_path_length;
257
258 /**
259  * Total put path length across all peers.
260  */
261 static unsigned int total_put_path_length;
262
263 /**
264  * Total get path length across all peers.
265  */
266 static unsigned int total_get_path_length;
267
268 /**
269  * Hashmap to store pair of peer and its corresponding successor.
270  */
271 static struct GNUNET_CONTAINER_MultiHashMap *successor_peer_hashmap;
272
273 /**
274  * Key to start the lookup on successor_peer_hashmap.
275  */
276 static struct GNUNET_HashCode *start_key;
277
278 /**
279  * Flag used to get the start_key.
280  */
281 static int flag = 0;
282
283 /**
284  * Task to collect peer and its current successor statistics.
285  */
286 static struct GNUNET_SCHEDULER_Task * successor_stats_task;
287
288 /**
289  * Closure for successor_stats_task.
290  */
291 struct Collect_Stat_Context
292 {
293   /**
294    * Current Peer Context.
295    */
296   struct Context *service_connect_ctx;
297
298   /**
299    * Testbed operation acting on this peer
300    */
301   struct GNUNET_TESTBED_Operation *op;
302 };
303
304 /**
305  * List of all the peers contexts.
306  */
307 struct Context **peer_contexts = NULL;
308
309 /**
310  * Counter to keep track of peers added to peer_context lists.
311  */
312 static int peers_started = 0;
313
314 /**
315  * Should we do a PUT (mode = 0) or GET (mode = 1);
316  */
317 static enum
318 {
319   MODE_PUT = 0,
320
321   MODE_GET = 1
322 } mode;
323
324
325 /**
326  * Are we shutting down
327  */
328 static int in_shutdown = 0;
329
330 /**
331  * Total number of times to check if circle is formed or not.
332  */
333 static unsigned int tries;
334
335
336 /**
337  * Task that collects successor statistics from all the peers.
338  *
339  * @param cls
340  */
341 static void
342 collect_stats (void *cls);
343
344
345 /**
346  * Connect to DHT services of active peers
347  */
348 static void
349 start_profiling (void);
350
351
352 /**
353  * Shutdown task.  Cleanup all resources and operations.
354  *
355  * @param cls NULL
356  */
357 static void
358 do_shutdown (void *cls)
359 {
360   struct ActiveContext *ac;
361   unsigned int cnt;
362
363   in_shutdown = GNUNET_YES;
364   if (NULL != a_ctx)
365   {
366     for (cnt=0; cnt < num_peers; cnt++)
367     {
368       /* Cleanup active context if this peer is an active peer */
369       ac = a_ctx[cnt].ac;
370       if (NULL != ac)
371       {
372         if (NULL != ac->delay_task)
373           GNUNET_SCHEDULER_cancel (ac->delay_task);
374         if (NULL != ac->put_data)
375           GNUNET_free (ac->put_data);
376         if (NULL != ac->dht_put)
377           GNUNET_DHT_put_cancel (ac->dht_put);
378         if (NULL != ac->dht_get)
379           GNUNET_DHT_get_stop (ac->dht_get);
380       }
381       /* Cleanup testbed operation handle at the last as this operation may
382          contain service connection to DHT */
383       if (NULL != a_ctx[cnt].op)
384         GNUNET_TESTBED_operation_done (a_ctx[cnt].op);
385     }
386     GNUNET_free (a_ctx);
387     a_ctx = NULL;
388   }
389   //FIXME: Should we collect stats only for put/get not for other messages.
390   if (NULL != bandwidth_stats_op)
391   {
392     GNUNET_TESTBED_operation_done (bandwidth_stats_op);
393     bandwidth_stats_op = NULL;
394   }
395   if (NULL != successor_stats_op)
396   {
397     GNUNET_TESTBED_operation_done (successor_stats_op);
398     successor_stats_op = NULL;
399   }
400   if (NULL != successor_stats_task)
401   {
402     GNUNET_SCHEDULER_cancel (successor_stats_task);
403     successor_stats_task = NULL;
404   }
405   GNUNET_free_non_null (a_ac);
406 }
407
408
409 /**
410  * Stats callback. Finish the stats testbed operation and when all stats have
411  * been iterated, shutdown the test.
412  *
413  * @param cls closure
414  * @param op the operation that has been finished
415  * @param emsg error message in case the operation has failed; will be NULL if
416  *          operation has executed successfully.
417  */
418 static void
419 bandwidth_stats_cont (void *cls,
420                       struct GNUNET_TESTBED_Operation *op,
421                       const char *emsg)
422 {
423   INFO ("# Outgoing bandwidth: %llu\n",
424         (unsigned long long) outgoing_bandwidth);
425   INFO ("# Incoming bandwidth: %llu\n",
426         (unsigned long long) incoming_bandwidth);
427   GNUNET_SCHEDULER_shutdown ();
428 }
429
430
431 /**
432  * Process statistic values.
433  *
434  * @param cls closure
435  * @param peer the peer the statistic belong to
436  * @param subsystem name of subsystem that created the statistic
437  * @param name the name of the datum
438  * @param value the current value
439  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
440  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
441  */
442 static int
443 bandwidth_stats_iterator (void *cls,
444                           const struct GNUNET_TESTBED_Peer *peer,
445                           const char *subsystem,
446                           const char *name,
447                           uint64_t value,
448                           int is_persistent)
449 {
450    static const char *s_sent = "# Bytes transmitted to other peers";
451    static const char *s_recv = "# Bytes received from other peers";
452
453    if (0 == strncmp (s_sent, name, strlen (s_sent)))
454      outgoing_bandwidth = outgoing_bandwidth + value;
455    else if (0 == strncmp(s_recv, name, strlen (s_recv)))
456      incoming_bandwidth = incoming_bandwidth + value;
457
458     return GNUNET_OK;
459 }
460
461
462 static void
463 summarize ()
464 {
465   INFO ("# PUTS made: %u\n", n_puts);
466   INFO ("# PUTS succeeded: %u\n", n_puts_ok);
467   INFO ("# PUTS failed: %u\n", n_puts_fail);
468   INFO ("# GETS made: %u\n", n_gets);
469   INFO ("# GETS succeeded: %u\n", n_gets_ok);
470   INFO ("# GETS failed: %u\n", n_gets_fail);
471   INFO ("# average_put_path_length: %f\n", average_put_path_length);
472   INFO ("# average_get_path_length: %f\n", average_get_path_length);
473
474   if (NULL == testbed_handles)
475   {
476     INFO ("No peers found\n");
477     return;
478   }
479   /* Collect Stats*/
480   bandwidth_stats_op = GNUNET_TESTBED_get_statistics (n_active, testbed_handles,
481                                                       "dht", NULL,
482                                                        bandwidth_stats_iterator,
483                                                        bandwidth_stats_cont, NULL);
484 }
485
486
487 /**
488  * Task to cancel DHT GET.
489  *
490  * @param cls NULL
491  */
492 static void
493 cancel_get (void *cls)
494 {
495   struct ActiveContext *ac = cls;
496   struct Context *ctx = ac->ctx;
497
498   ac->delay_task = NULL;
499   GNUNET_assert (NULL != ac->dht_get);
500   GNUNET_DHT_get_stop (ac->dht_get);
501   ac->dht_get = NULL;
502   n_gets_fail++;
503   GNUNET_assert (NULL != ctx->op);
504   GNUNET_TESTBED_operation_done (ctx->op);
505   ctx->op = NULL;
506
507   /* If profiling is complete, summarize */
508   if (n_active == n_gets_fail + n_gets_ok)
509   {
510     average_put_path_length = (double)total_put_path_length/(double)n_active;
511     average_get_path_length = (double)total_get_path_length/(double )n_gets_ok;
512     summarize ();
513   }
514 }
515
516
517 /**
518  * Iterator called on each result obtained for a DHT
519  * operation that expects a reply
520  *
521  * @param cls closure
522  * @param exp when will this value expire
523  * @param key key of the result
524  * @param get_path peers on reply path (or NULL if not recorded)
525  *                 [0] = datastore's first neighbor, [length - 1] = local peer
526  * @param get_path_length number of entries in @a get_path
527  * @param put_path peers on the PUT path (or NULL if not recorded)
528  *                 [0] = origin, [length - 1] = datastore
529  * @param put_path_length number of entries in @a put_path
530  * @param type type of the result
531  * @param size number of bytes in @a data
532  * @param data pointer to the result data
533  */
534 static void
535 get_iter (void *cls,
536           struct GNUNET_TIME_Absolute exp,
537           const struct GNUNET_HashCode *key,
538           const struct GNUNET_PeerIdentity *get_path,
539           unsigned int get_path_length,
540           const struct GNUNET_PeerIdentity *put_path,
541           unsigned int put_path_length,
542           enum GNUNET_BLOCK_Type type,
543           size_t size, const void *data)
544 {
545   struct ActiveContext *ac = cls;
546   struct ActiveContext *get_ac = ac->get_ac;
547   struct Context *ctx = ac->ctx;
548
549   /* Check the keys of put and get match or not. */
550   GNUNET_assert (0 == memcmp (key, &get_ac->hash, sizeof (struct GNUNET_HashCode)));
551   /* we found the data we are looking for */
552   DEBUG ("We found a GET request; %u remaining\n", n_gets - (n_gets_fail + n_gets_ok)); //FIXME: It always prints 1.
553   n_gets_ok++;
554   get_ac->nrefs--;
555   GNUNET_DHT_get_stop (ac->dht_get);
556   ac->dht_get = NULL;
557   if (ac->delay_task != NULL)
558     GNUNET_SCHEDULER_cancel (ac->delay_task);
559   ac->delay_task = NULL;
560   GNUNET_assert (NULL != ctx->op);
561   GNUNET_TESTBED_operation_done (ctx->op);
562   ctx->op = NULL;
563
564   total_put_path_length = total_put_path_length + (double)put_path_length;
565   total_get_path_length = total_get_path_length + (double)get_path_length;
566   DEBUG ("total_put_path_length = %u,put_path \n",
567          total_put_path_length);
568   /* Summarize if profiling is complete */
569   if (n_active == n_gets_fail + n_gets_ok)
570   {
571     average_put_path_length = (double)total_put_path_length/(double)n_active;
572     average_get_path_length = (double)total_get_path_length/(double )n_gets_ok;
573     summarize ();
574   }
575 }
576
577
578 /**
579  * Task to do DHT GETs
580  *
581  * @param cls the active context
582  */
583 static void
584 delayed_get (void *cls)
585 {
586   struct ActiveContext *ac = cls;
587   struct ActiveContext *get_ac;
588   unsigned int r;
589
590   ac->delay_task = NULL;
591   get_ac = NULL;
592   while (1)
593   {
594     r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, n_active);
595     get_ac = &a_ac[r];
596     if (NULL != get_ac->put_data)
597       break;
598   }
599   get_ac->nrefs++;
600   ac->get_ac = get_ac;
601   DEBUG ("GET_REQUEST_START key %s \n", GNUNET_h2s((struct GNUNET_HashCode *)ac->put_data));
602   ac->dht_get = GNUNET_DHT_get_start (ac->dht,
603                                       GNUNET_BLOCK_TYPE_TEST,
604                                       &get_ac->hash,
605                                       1, /* replication level */
606                                       GNUNET_DHT_RO_NONE,
607                                       NULL, 0, /* extended query and size */
608                                       get_iter, ac); /* GET iterator and closure
609                                                         */
610   n_gets++;
611
612   /* schedule the timeout task for GET */
613   ac->delay_task = GNUNET_SCHEDULER_add_delayed (timeout, &cancel_get, ac);
614 }
615
616
617 /**
618  * Task to teardown the dht connection.  We do it as a task because calling
619  * GNUNET_DHT_disconnect() from put_continutation_callback seems illegal (the
620  * put_continuation_callback() is getting called again synchronously).  Also,
621  * only free the operation when we are not shutting down; the shutdown task will
622  * clear the operation during shutdown.
623  *
624  * @param cls the context
625  */
626 static void
627 teardown_dht_connection (void *cls)
628 {
629   struct Context *ctx = cls;
630   struct GNUNET_TESTBED_Operation *op;
631
632   GNUNET_assert (NULL != ctx);
633   GNUNET_assert (NULL != (op = ctx->op));
634   ctx->op = NULL;
635   GNUNET_TESTBED_operation_done (op);
636 }
637
638
639 /**
640  * Queue up a delayed task for doing DHT GET
641  *
642  * @param cls the active context
643  * @param success #GNUNET_OK if the PUT was transmitted,
644  *                #GNUNET_NO on timeout,
645  *                #GNUNET_SYSERR on disconnect from service
646  *                after the PUT message was transmitted
647  *                (so we don't know if it was received or not)
648  */
649 static void
650 put_cont (void *cls, int success)
651 {
652   struct ActiveContext *ac = cls;
653   struct Context *ctx = ac->ctx;
654
655   ac->dht_put = NULL;
656   if (success)
657     n_puts_ok++;
658   else
659     n_puts_fail++;
660   GNUNET_assert (NULL != ctx);
661   (void) GNUNET_SCHEDULER_add_now (&teardown_dht_connection, ctx);
662 }
663
664
665 /**
666  * Task to do DHT PUTs
667  *
668  * @param cls the active context
669  */
670 static void
671 delayed_put (void *cls)
672 {
673   struct ActiveContext *ac = cls;
674
675   ac->delay_task = NULL;
676   /* Generate and DHT PUT some random data */
677   ac->put_data_size = 16;       /* minimum */
678   ac->put_data_size += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
679                                                  (63*1024));
680   ac->put_data = GNUNET_malloc (ac->put_data_size);
681   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK,
682                               ac->put_data, ac->put_data_size);
683   GNUNET_CRYPTO_hash (ac->put_data, ac->put_data_size, &ac->hash);
684   DEBUG ("PUT_REQUEST_START key %s\n",
685          GNUNET_h2s((struct GNUNET_HashCode *)ac->put_data));
686   ac->dht_put = GNUNET_DHT_put (ac->dht,
687                                 &ac->hash,
688                                 replication,
689                                 GNUNET_DHT_RO_RECORD_ROUTE,
690                                 GNUNET_BLOCK_TYPE_TEST,
691                                 ac->put_data_size,
692                                 ac->put_data,
693                                 GNUNET_TIME_UNIT_FOREVER_ABS, /* expiration time */
694                                 &put_cont, ac);                /* continuation and its closure */
695   n_puts++;
696 }
697
698
699 /**
700  * Connection to DHT has been established.  Call the delay task.
701  *
702  * @param cls the active context
703  * @param op the operation that has been finished
704  * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
705  * @param emsg error message in case the operation has failed; will be NULL if
706  *          operation has executed successfully.
707  */
708 static void
709 dht_connected (void *cls,
710                struct GNUNET_TESTBED_Operation *op,
711                void *ca_result,
712                const char *emsg)
713 {
714   struct ActiveContext *ac = cls;
715   struct Context *ctx = ac->ctx;
716
717   GNUNET_assert (NULL != ctx); //FIXME: Fails
718   GNUNET_assert (NULL != ctx->op);
719   GNUNET_assert (ctx->op == op);
720   ac->dht = (struct GNUNET_DHT_Handle *) ca_result;
721   if (NULL != emsg)
722   {
723     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Connection to DHT service failed: %s\n", emsg);
724     GNUNET_TESTBED_operation_done (ctx->op); /* Calls dht_disconnect() */
725     ctx->op = NULL;
726     return;
727   }
728   switch (mode)
729   {
730   case MODE_PUT:
731   {
732     struct GNUNET_TIME_Relative peer_delay_put;
733     peer_delay_put.rel_value_us =
734       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
735                                 delay_put.rel_value_us);
736     ac->delay_task = GNUNET_SCHEDULER_add_delayed (peer_delay_put, &delayed_put, ac);
737     break;
738   }
739   case MODE_GET:
740   {
741     struct GNUNET_TIME_Relative peer_delay_get;
742     peer_delay_get.rel_value_us =
743       delay_get.rel_value_us +
744       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
745                                 delay_get.rel_value_us);
746     ac->delay_task = GNUNET_SCHEDULER_add_delayed (peer_delay_get, &delayed_get, ac);
747     break;
748   }
749   }
750 }
751
752
753 /**
754  * Connect to DHT service and return the DHT client handler
755  *
756  * @param cls the active context
757  * @param cfg configuration of the peer to connect to; will be available until
758  *          GNUNET_TESTBED_operation_done() is called on the operation returned
759  *          from GNUNET_TESTBED_service_connect()
760  * @return service handle to return in 'op_result', NULL on error
761  */
762 static void *
763 dht_connect (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
764 {
765   n_dht++;
766   return GNUNET_DHT_connect (cfg, 10);
767 }
768
769
770 /**
771  * Adapter function called to destroy a connection to
772  * a service.
773  *
774  * @param cls the active context
775  * @param op_result service handle returned from the connect adapter
776  */
777 static void
778 dht_disconnect (void *cls, void *op_result)
779 {
780   struct ActiveContext *ac = cls;
781
782   GNUNET_assert (NULL != ac->dht);
783   GNUNET_assert (ac->dht == op_result);
784   GNUNET_DHT_disconnect (ac->dht);
785   ac->dht = NULL;
786   n_dht--;
787   if (0 != n_dht)
788     return;
789   if (GNUNET_YES == in_shutdown)
790     return;
791   switch (mode)
792   {
793   case MODE_PUT:
794     if ((n_puts_ok + n_puts_fail) != n_active)
795       return;
796     /* Start GETs if all PUTs have been made */
797     mode = MODE_GET;
798     //(void) GNUNET_SCHEDULER_add_now (&call_start_profiling, NULL);
799     start_profiling ();
800     return;
801   case MODE_GET:
802     if ((n_gets_ok + n_gets_fail) != n_active)
803       return;
804     break;
805   }
806 }
807
808 /**
809  * Connect to DHT services of active peers
810  */
811 static void
812 start_profiling()
813 {
814   struct Context *ctx;
815   unsigned int i;
816
817   DEBUG("GNUNET_TESTBED_service_connect \n");
818   GNUNET_break (GNUNET_YES != in_shutdown);
819   for(i = 0; i < n_active; i++)
820   {
821     struct ActiveContext *ac = &a_ac[i];
822     GNUNET_assert (NULL != (ctx = ac->ctx));
823     GNUNET_assert (NULL == ctx->op);
824     ctx->op =
825         GNUNET_TESTBED_service_connect (ctx,
826                                         ctx->peer,
827                                         "dht",
828                                         &dht_connected, ac,
829                                         &dht_connect,
830                                         &dht_disconnect,
831                                         ac);
832   }
833 }
834
835 /**
836  * Start collecting relevant statistics. If ENABLE_MALICIOUS set, first
837  * set the malicious peers. If not, then start with PUT operation on active
838  * peers.
839  */
840 static void
841 start_func()
842 {
843   start_profiling();
844 }
845
846
847 /**
848  * Remove entry from successor peer hashmap.
849  * @param cls closure
850  * @param key current public key
851  * @param value value in the hash map
852  * @return #GNUNET_YES if we should continue to iterate,
853  *         #GNUNET_NO if not.
854  */
855 static int
856 hashmap_iterate_remove(void *cls,
857                        const struct GNUNET_HashCode *key,
858                        void *value)
859 {
860   GNUNET_assert (GNUNET_YES ==
861                 GNUNET_CONTAINER_multihashmap_remove(successor_peer_hashmap, key, value));
862   return GNUNET_YES;
863 }
864
865
866 /**
867  * Stats callback. Iterate over the hashmap and check if all th peers form
868  * a virtual ring topology.
869  *
870  * @param cls closure
871  * @param op the operation that has been finished
872  * @param emsg error message in case the operation has failed; will be NULL if
873  *          operation has executed successfully.
874  */
875 static void
876 successor_stats_cont (void *cls,
877                       struct GNUNET_TESTBED_Operation *op,
878                       const char *emsg)
879 {
880   struct GNUNET_HashCode *val;
881   struct GNUNET_HashCode *start_val;
882   struct GNUNET_HashCode *key;
883   int count;
884
885   /* Don't schedule the task till we are looking for circle here. */
886   successor_stats_task = NULL;
887   GNUNET_TESTBED_operation_done (successor_stats_op);
888   successor_stats_op = NULL;
889   if (0 == max_searches)
890   {
891     start_func ();
892     return;
893   }
894
895   GNUNET_assert (NULL != start_key);
896   start_val = GNUNET_CONTAINER_multihashmap_get (successor_peer_hashmap,
897                                                  start_key);
898   GNUNET_assert (NULL != start_val);
899   val = start_val;
900   for (count = 0; count < num_peers; count++)
901   {
902     key = val;
903     val = GNUNET_CONTAINER_multihashmap_get (successor_peer_hashmap,
904                                              key);
905     if (NULL == val)
906       break;
907     /* Remove the entry from hashmap. This is done to take care of loop. */
908     if (GNUNET_NO ==
909         GNUNET_CONTAINER_multihashmap_remove (successor_peer_hashmap,
910                                               key, val))
911     {
912       DEBUG ("Failed to remove entry from hashmap\n");
913       break;
914     }
915     /* If a peer has its own identity as its successor. */
916     if (0 == memcmp(key, val, sizeof (struct GNUNET_HashCode)))
917       break;
918   }
919
920   GNUNET_assert (GNUNET_SYSERR !=
921                  GNUNET_CONTAINER_multihashmap_iterate (successor_peer_hashmap,
922                                                         &hashmap_iterate_remove,
923                                                         NULL));
924
925   successor_peer_hashmap = GNUNET_CONTAINER_multihashmap_create (num_peers,
926                                                                  GNUNET_NO);
927   if ((start_val == val) && (count == num_peers))
928   {
929     DEBUG ("CIRCLE COMPLETED after %u tries", tries);
930     if(NULL == successor_stats_task)
931     {
932       start_func();
933     }
934     return;
935   }
936   if (max_searches == ++tries)
937   {
938     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939                 "Maximum tries %u exceeded while checking successor TOTAL TRIES %u"
940                 " circle formation.  Exiting\n",
941                 max_searches,tries);
942     start_func();
943     return;
944   }
945   flag = 0;
946   successor_stats_task
947     = GNUNET_SCHEDULER_add_delayed (delay_stats,
948                                     &collect_stats,
949                                     cls);
950 }
951
952
953 /**
954  * Process successor statistic values.
955  *
956  * @param cls closure
957  * @param peer the peer the statistic belong to
958  * @param subsystem name of subsystem that created the statistic
959  * @param name the name of the datum
960  * @param value the current value
961  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
962  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
963  */
964 static int
965 successor_stats_iterator (void *cls,
966                           const struct GNUNET_TESTBED_Peer *peer,
967                           const char *subsystem,
968                           const char *name,
969                           uint64_t value,
970                           int is_persistent)
971 {
972   static const char *key_string = "XDHT";
973   if (0 == max_searches)
974     return GNUNET_OK;
975
976   if (0 == strncmp (key_string, name, strlen (key_string)))
977   {
978     char *my_id_str;
979     char successor_str[13];
980     char truncated_my_id_str[13];
981     char truncated_successor_str[13];
982     struct GNUNET_HashCode *my_id_key;
983     struct GNUNET_HashCode *succ_key;
984
985     strtok((char *)name,":");
986     my_id_str = strtok(NULL,":");
987
988     strncpy(truncated_my_id_str, my_id_str, 12);
989     truncated_my_id_str[12] = '\0';
990     my_id_key = GNUNET_new(struct GNUNET_HashCode);
991     GNUNET_CRYPTO_hash (truncated_my_id_str, sizeof(truncated_my_id_str),my_id_key);
992     GNUNET_STRINGS_data_to_string(&value, sizeof(uint64_t), successor_str, 13);
993     strncpy(truncated_successor_str, successor_str, 12);
994     truncated_successor_str[12] ='\0';
995
996     succ_key = GNUNET_new(struct GNUNET_HashCode);
997     GNUNET_CRYPTO_hash (truncated_successor_str, sizeof(truncated_successor_str),succ_key);
998
999     if (0 == flag)
1000     {
1001       GNUNET_assert(NULL != my_id_key);
1002       start_key = my_id_key;
1003       GNUNET_assert(NULL != start_key);
1004       flag = 1;
1005     }
1006     GNUNET_CONTAINER_multihashmap_put (successor_peer_hashmap,
1007                                        my_id_key, (void *)succ_key,
1008                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1009   }
1010
1011   return GNUNET_OK;
1012 }
1013
1014
1015 /*
1016  * Task that collects peer and its corresponding successors.
1017  *
1018  * @param cls Closure (NULL).
1019  */
1020 static void
1021 collect_stats (void *cls)
1022 {
1023   successor_stats_task = NULL;
1024   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1025               "Start collecting statistics...\n");
1026   GNUNET_assert(NULL != testbed_handles);
1027
1028   if (0 != max_searches)
1029     successor_peer_hashmap
1030       = GNUNET_CONTAINER_multihashmap_create (num_peers,
1031                                               GNUNET_NO);
1032   successor_stats_op
1033     = GNUNET_TESTBED_get_statistics (num_peers, testbed_handles,
1034                                      "dht", NULL,
1035                                      successor_stats_iterator,
1036                                      successor_stats_cont, cls);
1037   GNUNET_assert (NULL != successor_stats_op);
1038 }
1039
1040
1041 /**
1042  * Callback called when DHT service on the peer is started
1043  *
1044  * @param cls the context
1045  * @param op the operation that has been finished
1046  * @param emsg error message in case the operation has failed; will be NULL if
1047  *          operation has executed successfully.
1048  */
1049 static void
1050 service_started (void *cls,
1051                  struct GNUNET_TESTBED_Operation *op,
1052                  const char *emsg)
1053 {
1054   struct Context *ctx = cls;
1055
1056   GNUNET_assert (NULL != ctx);
1057   GNUNET_assert (NULL != ctx->op);
1058   GNUNET_TESTBED_operation_done (ctx->op);
1059   ctx->op = NULL;
1060   peers_started++;
1061   DEBUG("Peers Started = %d; num_peers = %d \n", peers_started, num_peers);
1062   if (NULL == successor_stats_task && peers_started == num_peers)
1063   {
1064      DEBUG("successor_stats_task \n");
1065      struct Collect_Stat_Context *collect_stat_cls = GNUNET_new(struct Collect_Stat_Context);
1066      collect_stat_cls->service_connect_ctx = cls;
1067      collect_stat_cls->op = op;
1068
1069      successor_stats_task
1070        = GNUNET_SCHEDULER_add_delayed (delay_stats,
1071                                        &collect_stats,
1072                                        collect_stat_cls);
1073   }
1074 }
1075
1076
1077 /**
1078  * Signature of a main function for a testcase.
1079  *
1080  * @param cls closure
1081  * @param h the run handle
1082  * @param num_peers number of peers in 'peers'
1083  * @param peers handle to peers run in the testbed
1084  * @param links_succeeded the number of overlay link connection attempts that
1085  *          succeeded
1086  * @param links_failed the number of overlay link
1087  */
1088 static void
1089 test_run (void *cls,
1090           struct GNUNET_TESTBED_RunHandle *h,
1091           unsigned int num_peers, struct GNUNET_TESTBED_Peer **peers,
1092           unsigned int links_succeeded,
1093           unsigned int links_failed)
1094 {
1095   unsigned int cnt;
1096   unsigned int ac_cnt;
1097   testbed_handles = peers;
1098   if (NULL == peers)
1099   {
1100     /* exit */
1101     GNUNET_assert (0);
1102   }
1103   INFO ("%u peers started\n", num_peers);
1104   a_ctx = GNUNET_malloc (sizeof (struct Context) * num_peers);
1105
1106   /* select the peers which actively participate in profiling */
1107   n_active = num_peers * PUT_PROBABILITY / 100;
1108   if (0 == n_active)
1109   {
1110     GNUNET_SCHEDULER_shutdown ();
1111     GNUNET_free (a_ctx);
1112     return;
1113   }
1114
1115   a_ac = GNUNET_malloc (n_active * sizeof (struct ActiveContext));
1116   ac_cnt = 0;
1117   for (cnt = 0; cnt < num_peers && ac_cnt < n_active; cnt++)
1118   {
1119     if ((GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100) >=
1120         PUT_PROBABILITY))
1121       continue;
1122
1123     a_ctx[cnt].ac = &a_ac[ac_cnt];
1124     a_ac[ac_cnt].ctx = &a_ctx[cnt];
1125     ac_cnt++;
1126   }
1127   n_active = ac_cnt;
1128   INFO ("Active peers: %u\n", n_active);
1129
1130   /* start DHT service on all peers */
1131   for (cnt = 0; cnt < num_peers; cnt++)
1132   {
1133     a_ctx[cnt].peer = peers[cnt];
1134     a_ctx[cnt].op = GNUNET_TESTBED_peer_manage_service (&a_ctx[cnt],
1135                                                         peers[cnt],
1136                                                         "dht",
1137                                                         &service_started,
1138                                                         &a_ctx[cnt],
1139                                                         1);
1140   }
1141 }
1142
1143
1144 /**
1145  * Main function that will be run by the scheduler.
1146  *
1147  * @param cls closure
1148  * @param args remaining command-line arguments
1149  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
1150  * @param config configuration
1151  */
1152 static void
1153 run (void *cls, char *const *args, const char *cfgfile,
1154      const struct GNUNET_CONFIGURATION_Handle *config)
1155 {
1156   uint64_t event_mask;
1157
1158   if (0 == num_peers)
1159   {
1160     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1161                 _("Exiting as the number of peers is %u\n"),
1162                 num_peers);
1163     return;
1164   }
1165   cfg = GNUNET_CONFIGURATION_dup (config);
1166   event_mask = 0;
1167   GNUNET_TESTBED_run (hosts_file, cfg, num_peers, event_mask, NULL,
1168                       NULL, &test_run, NULL);
1169   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1170                                  NULL);
1171 }
1172
1173
1174 /**
1175  * Main function.
1176  *
1177  * @return 0 on success
1178  */
1179 int
1180 main (int argc, char *const *argv)
1181 {
1182   int rc;
1183
1184   static struct GNUNET_GETOPT_CommandLineOption options[] = {
1185     {'n', "peers", "COUNT",
1186      gettext_noop ("number of peers to start"),
1187      1, &GNUNET_GETOPT_set_uint, &num_peers},
1188     {'s', "searches", "COUNT",
1189      gettext_noop ("maximum number of times we try to search for successor circle formation (0 for R5N)"),
1190      1, &GNUNET_GETOPT_set_uint, &max_searches},
1191     {'H', "hosts", "FILENAME",
1192      gettext_noop ("name of the file with the login information for the testbed"),
1193      1, &GNUNET_GETOPT_set_string, &hosts_file},
1194     {'D', "delay", "DELAY",
1195      gettext_noop ("delay between rounds for collecting statistics (default: 30 sec)"),
1196      1, &GNUNET_GETOPT_set_relative_time, &delay_stats},
1197     {'P', "PUT-delay", "DELAY",
1198      gettext_noop ("delay to start doing PUTs (default: 1 sec)"),
1199      1, &GNUNET_GETOPT_set_relative_time, &delay_put},
1200     {'G', "GET-delay", "DELAY",
1201      gettext_noop ("delay to start doing GETs (default: 5 min)"),
1202      1, &GNUNET_GETOPT_set_relative_time, &delay_get},
1203     {'r', "replication", "DEGREE",
1204      gettext_noop ("replication degree for DHT PUTs"),
1205      1, &GNUNET_GETOPT_set_uint, &replication},
1206     {'t', "timeout", "TIMEOUT",
1207      gettext_noop ("timeout for DHT PUT and GET requests (default: 1 min)"),
1208      1, &GNUNET_GETOPT_set_relative_time, &timeout},
1209     GNUNET_GETOPT_OPTION_END
1210   };
1211
1212   max_searches = 5;
1213   if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
1214     return 2;
1215   /* set default delays */
1216   delay_stats = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1217   delay_put = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1218   delay_get = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1219   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1220   replication = 1;      /* default replication */
1221   rc = 0;
1222   if (GNUNET_OK !=
1223       GNUNET_PROGRAM_run (argc, argv, "dht-profiler",
1224                           gettext_noop
1225                           ("Measure quality and performance of the DHT service."),
1226                           options, &run, NULL))
1227     rc = 1;
1228   return rc;
1229 }