first batch of license fixes (boring)
[oweals/gnunet.git] / src / rps / gnunet-rps-profiler.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2012 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15 /**
16  * @file rps/test_rps.c
17  * @brief Testcase for the random peer sampling service.  Starts
18  *        a peergroup with a given number of peers, then waits to
19  *        receive size pushes/pulls from each peer.  Expects to wait
20  *        for one message from each peer.
21  */
22 #include "platform.h"
23 //#include "rps_test_lib.h"
24 #include "gnunet_util_lib.h"
25 #include "gnunet_testbed_service.h"
26
27 #include "gnunet_rps_service.h"
28 #include "rps-test_util.h"
29 #include "gnunet-service-rps_sampler_elem.h"
30
31 #include <inttypes.h>
32
33
34 /**
35  * How many peers do we start?
36  */
37 static uint32_t num_peers;
38
39 /**
40  * How long do we run the test?
41  * In seconds.
42  */
43 static uint32_t timeout_s;
44
45 /**
46  * How long do we run the test?
47  */
48 //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
49 static struct GNUNET_TIME_Relative timeout;
50
51
52 /**
53  * Portion of malicious peers
54  */
55 static double portion = .1;
56
57 /**
58  * Type of malicious peer to test
59  */
60 static unsigned int mal_type = 0;
61
62 /**
63  * Handles to all of the running peers
64  */
65 static struct GNUNET_TESTBED_Peer **testbed_peers;
66
67 /**
68  * @brief Indicates whether peer should go off- or online
69  */
70 enum PEER_ONLINE_DELTA {
71   /**
72    * @brief Indicates peer going online
73    */
74   PEER_GO_ONLINE = 1,
75   /**
76    * @brief Indicates peer going offline
77    */
78   PEER_GO_OFFLINE = -1,
79 };
80
81 /**
82  * Operation map entry
83  */
84 struct OpListEntry
85 {
86   /**
87    * DLL next ptr
88    */
89   struct OpListEntry *next;
90
91   /**
92    * DLL prev ptr
93    */
94   struct OpListEntry *prev;
95
96   /**
97    * The testbed operation
98    */
99   struct GNUNET_TESTBED_Operation *op;
100
101   /**
102    * Depending on whether we start or stop RPS service at the peer, set this to
103    * #PEER_GO_ONLINE (1) or #PEER_GO_OFFLINE (-1)
104    */
105   enum PEER_ONLINE_DELTA delta;
106
107   /**
108    * Index of the regarding peer
109    */
110   unsigned int index;
111 };
112
113 /**
114  * OpList DLL head
115  */
116 static struct OpListEntry *oplist_head;
117
118 /**
119  * OpList DLL tail
120  */
121 static struct OpListEntry *oplist_tail;
122
123
124 /**
125  * A pending reply: A request was sent and the reply is pending.
126  */
127 struct PendingReply
128 {
129   /**
130    * DLL next,prev ptr
131    */
132   struct PendingReply *next;
133   struct PendingReply *prev;
134
135   /**
136    * Handle to the request we are waiting for
137    */
138   struct GNUNET_RPS_Request_Handle *req_handle;
139
140   /**
141    * The peer that requested
142    */
143   struct RPSPeer *rps_peer;
144 };
145
146
147 /**
148  * A pending request: A request was not made yet but is scheduled for later.
149  */
150 struct PendingRequest
151 {
152   /**
153    * DLL next,prev ptr
154    */
155   struct PendingRequest *next;
156   struct PendingRequest *prev;
157
158   /**
159    * Handle to the request we are waiting for
160    */
161   struct GNUNET_SCHEDULER_Task *request_task;
162
163   /**
164    * The peer that requested
165    */
166   struct RPSPeer *rps_peer;
167 };
168
169
170 /**
171  * Information we track for each peer.
172  */
173 struct RPSPeer
174 {
175   /**
176    * Index of the peer.
177    */
178   unsigned int index;
179
180   /**
181    * Handle for RPS connect operation.
182    */
183   struct GNUNET_TESTBED_Operation *op;
184
185   /**
186    * Handle to RPS service.
187    */
188   struct GNUNET_RPS_Handle *rps_handle;
189
190   /**
191    * ID of the peer.
192    */
193   struct GNUNET_PeerIdentity *peer_id;
194
195   /**
196    * A request handle to check for an request
197    */
198   //struct GNUNET_RPS_Request_Handle *req_handle;
199
200   /**
201    * Peer on- or offline?
202    */
203   int online;
204
205   /**
206    * Number of Peer IDs to request during the whole test
207    */
208   unsigned int num_ids_to_request;
209
210   /**
211    * Pending requests DLL
212    */
213   struct PendingRequest *pending_req_head;
214   struct PendingRequest *pending_req_tail;
215
216   /**
217    * Number of pending requests
218    */
219   unsigned int num_pending_reqs;
220
221   /**
222    * Pending replies DLL
223    */
224   struct PendingReply *pending_rep_head;
225   struct PendingReply *pending_rep_tail;
226
227   /**
228    * Number of pending replies
229    */
230   unsigned int num_pending_reps;
231
232   /**
233    * Number of received PeerIDs
234    */
235   unsigned int num_recv_ids;
236
237   /**
238    * Pending operation on that peer
239    */
240   const struct OpListEntry *entry_op_manage;
241
242   /**
243    * Testbed operation to connect to statistics service
244    */
245   struct GNUNET_TESTBED_Operation *stat_op;
246
247   /**
248    * Handle to the statistics service
249    */
250   struct GNUNET_STATISTICS_Handle *stats_h;
251
252   /**
253    * @brief flags to indicate which statistics values have been already
254    * collected from the statistics service.
255    * Used to check whether we are able to shutdown.
256    */
257   uint32_t stat_collected_flags;
258
259   /**
260    * @brief File name of the file the stats are finally written to
261    */
262   const char *file_name_stats;
263
264   /**
265    * @brief File name of the file the stats are finally written to
266    */
267   const char *file_name_probs;
268
269   /**
270    * @brief The current view
271    */
272   struct GNUNET_PeerIdentity *cur_view;
273
274   /**
275    * @brief Number of peers in the #cur_view.
276    */
277   uint32_t cur_view_count;
278
279   /**
280    * @brief Number of occurrences in other peer's view
281    */
282   uint32_t count_in_views;
283
284   /**
285    * @brief statistics values
286    */
287   uint64_t num_rounds;
288   uint64_t num_blocks;
289   uint64_t num_blocks_many_push;
290   uint64_t num_blocks_no_push;
291   uint64_t num_blocks_no_pull;
292   uint64_t num_blocks_many_push_no_pull;
293   uint64_t num_blocks_no_push_no_pull;
294   uint64_t num_issued_push;
295   uint64_t num_issued_pull_req;
296   uint64_t num_issued_pull_rep;
297   uint64_t num_sent_push;
298   uint64_t num_sent_pull_req;
299   uint64_t num_sent_pull_rep;
300   uint64_t num_recv_push;
301   uint64_t num_recv_pull_req;
302   uint64_t num_recv_pull_rep;
303 };
304
305 enum STAT_TYPE
306 {
307   STAT_TYPE_ROUNDS                    =    0x1, /*   1 */
308   STAT_TYPE_BLOCKS                    =    0x2, /*   2 */
309   STAT_TYPE_BLOCKS_MANY_PUSH          =    0x4, /*   3 */
310   STAT_TYPE_BLOCKS_NO_PUSH            =    0x8, /*   4 */
311   STAT_TYPE_BLOCKS_NO_PULL            =   0x10, /*   5 */
312   STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL  =   0x20, /*   6 */
313   STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL    =   0x40, /*   7 */
314   STAT_TYPE_ISSUED_PUSH_SEND          =   0x80, /*   8 */
315   STAT_TYPE_ISSUED_PULL_REQ           =  0x100, /*   9 */
316   STAT_TYPE_ISSUED_PULL_REP           =  0x200, /*  10 */
317   STAT_TYPE_SENT_PUSH_SEND            =  0x400, /*  11 */
318   STAT_TYPE_SENT_PULL_REQ             =  0x800, /*  12 */
319   STAT_TYPE_SENT_PULL_REP             = 0x1000, /*  13 */
320   STAT_TYPE_RECV_PUSH_SEND            = 0x2000, /*  14 */
321   STAT_TYPE_RECV_PULL_REQ             = 0x4000, /*  15 */
322   STAT_TYPE_RECV_PULL_REP             = 0x8000, /*  16 */
323   STAT_TYPE_MAX          = 0x80000000, /*  32 */
324 };
325
326 struct STATcls
327 {
328   struct RPSPeer *rps_peer;
329   enum STAT_TYPE stat_type;
330 };
331
332
333 /**
334  * Information for all the peers.
335  */
336 static struct RPSPeer *rps_peers;
337
338 /**
339  * Peermap to get the index of a given peer ID quick.
340  */
341 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
342
343 /**
344  * IDs of the peers.
345  */
346 static struct GNUNET_PeerIdentity *rps_peer_ids;
347
348 /**
349  * ID of the targeted peer.
350  */
351 static struct GNUNET_PeerIdentity *target_peer;
352
353 /**
354  * ID of the peer that requests for the evaluation.
355  */
356 static struct RPSPeer *eval_peer;
357
358 /**
359  * Number of online peers.
360  */
361 static unsigned int num_peers_online;
362
363 /**
364  * @brief The added sizes of the peer's views
365  */
366 static unsigned int view_sizes;
367
368 /**
369  * Return value from 'main'.
370  */
371 static int ok;
372
373 /**
374  * Identifier for the churn task that runs periodically
375  */
376 static struct GNUNET_SCHEDULER_Task *post_test_task;
377
378 /**
379  * Identifier for the churn task that runs periodically
380  */
381 static struct GNUNET_SCHEDULER_Task *shutdown_task;
382
383 /**
384  * Identifier for the churn task that runs periodically
385  */
386 static struct GNUNET_SCHEDULER_Task *churn_task;
387
388 /**
389  * Called to initialise the given RPSPeer
390  */
391 typedef void (*InitPeer) (struct RPSPeer *rps_peer);
392
393 /**
394  * @brief Called directly after connecting to the service
395  *
396  * @param rps_peer Specific peer the function is called on
397  * @param h the handle to the rps service
398  */
399 typedef void (*PreTest) (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h);
400
401 /**
402  * @brief Executes functions to test the api/service for a given peer
403  *
404  * Called from within #rps_connect_complete_cb ()
405  * Implemented by #churn_test_cb, #profiler_cb, #mal_cb, #single_req_cb,
406  * #delay_req_cb, #seed_big_cb, #single_peer_seed_cb, #seed_cb, #req_cancel_cb
407  *
408  * @param rps_peer the peer the task runs on
409  */
410 typedef void (*MainTest) (struct RPSPeer *rps_peer);
411
412 /**
413  * Callback called once the requested random peers are available
414  */
415 typedef void (*ReplyHandle) (void *cls,
416                              uint64_t n,
417                              const struct GNUNET_PeerIdentity *recv_peers);
418
419 /**
420  * Called directly before disconnecting from the service
421  */
422 typedef void (*PostTest) (struct RPSPeer *peer);
423
424 /**
425  * Function called after disconnect to evaluate test success
426  */
427 typedef int (*EvaluationCallback) (void);
428
429 /**
430  * @brief Do we have Churn?
431  */
432 enum OPTION_CHURN {
433   /**
434    * @brief If we have churn this is set
435    */
436   HAVE_CHURN,
437   /**
438    * @brief If we have no churn this is set
439    */
440   HAVE_NO_CHURN,
441 };
442
443 /**
444  * @brief Is it ok to quit the test before the timeout?
445  */
446 enum OPTION_QUICK_QUIT {
447   /**
448    * @brief It is ok for the test to quit before the timeout triggers
449    */
450   HAVE_QUICK_QUIT,
451
452   /**
453    * @brief It is NOT ok for the test to quit before the timeout triggers
454    */
455   HAVE_NO_QUICK_QUIT,
456 };
457
458 /**
459  * @brief Do we collect statistics at the end?
460  */
461 enum OPTION_COLLECT_STATISTICS {
462   /**
463    * @brief We collect statistics at the end
464    */
465   COLLECT_STATISTICS,
466
467   /**
468    * @brief We do not collect statistics at the end
469    */
470   NO_COLLECT_STATISTICS,
471 };
472
473 /**
474  * @brief Do we collect views during run?
475  */
476 enum OPTION_COLLECT_VIEW {
477   /**
478    * @brief We collect view during run
479    */
480   COLLECT_VIEW,
481
482   /**
483    * @brief We do not collect the view during run
484    */
485   NO_COLLECT_VIEW,
486 };
487
488 /**
489  * Structure to define a single test
490  */
491 struct SingleTestRun
492 {
493   /**
494    * Name of the test
495    */
496   char *name;
497
498   /**
499    * Called with a single peer in order to initialise that peer
500    */
501   InitPeer init_peer;
502
503   /**
504    * Called directly after connecting to the service
505    */
506   PreTest pre_test;
507
508   /**
509    * Main function for each peer
510    */
511   MainTest main_test;
512
513   /**
514    * Callback called once the requested peers are available
515    */
516   ReplyHandle reply_handle;
517
518   /**
519    * Called directly before disconnecting from the service
520    */
521   PostTest post_test;
522
523   /**
524    * Function to evaluate the test results
525    */
526   EvaluationCallback eval_cb;
527
528   /**
529    * Request interval
530    */
531   uint32_t request_interval;
532
533   /**
534    * Number of Requests to make.
535    */
536   uint32_t num_requests;
537
538   /**
539    * Run with (-out) churn
540    */
541   enum OPTION_CHURN have_churn;
542
543   /**
544    * Quit test before timeout?
545    */
546   enum OPTION_QUICK_QUIT have_quick_quit;
547
548   /**
549    * Collect statistics at the end?
550    */
551   enum OPTION_COLLECT_STATISTICS have_collect_statistics;
552
553   /**
554    * Collect view during run?
555    */
556   enum OPTION_COLLECT_VIEW have_collect_view;
557
558   /**
559    * @brief Mark which values from the statistics service to collect at the end
560    * of the run
561    */
562   uint32_t stat_collect_flags;
563 } cur_test_run;
564
565 /**
566  * Did we finish the test?
567  */
568 static int post_test;
569
570 /**
571  * Are we shutting down?
572  */
573 static int in_shutdown;
574
575 /**
576  * Append arguments to file
577  */
578 static void
579 tofile_ (const char *file_name, const char *line)
580 {
581   struct GNUNET_DISK_FileHandle *f;
582   /* char output_buffer[512]; */
583   size_t size;
584   /* int size; */
585   size_t size2;
586
587   if (NULL == (f = GNUNET_DISK_file_open (file_name,
588                                           GNUNET_DISK_OPEN_APPEND |
589                                           GNUNET_DISK_OPEN_WRITE |
590                                           GNUNET_DISK_OPEN_CREATE,
591                                           GNUNET_DISK_PERM_USER_READ |
592                                           GNUNET_DISK_PERM_USER_WRITE |
593                                           GNUNET_DISK_PERM_GROUP_READ |
594                                           GNUNET_DISK_PERM_OTHER_READ)))
595   {
596     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
597                 "Not able to open file %s\n",
598                 file_name);
599     return;
600   }
601   /* size = GNUNET_snprintf (output_buffer,
602                           sizeof (output_buffer),
603                           "%llu %s\n",
604                           GNUNET_TIME_absolute_get ().abs_value_us,
605                           line);
606   if (0 > size)
607   {
608     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
609                 "Failed to write string to buffer (size: %i)\n",
610                 size);
611     return;
612   } */
613
614   size = strlen (line) * sizeof (char);
615
616   size2 = GNUNET_DISK_file_write (f, line, size);
617   if (size != size2)
618   {
619     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
620                 "Unable to write to file! (Size: %lu, size2: %lu)\n",
621                 size,
622                 size2);
623     if (GNUNET_YES != GNUNET_DISK_file_close (f))
624     {
625       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
626                   "Unable to close file\n");
627     }
628     return;
629   }
630
631   if (GNUNET_YES != GNUNET_DISK_file_close (f))
632   {
633     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
634                 "Unable to close file\n");
635   }
636 }
637
638 /**
639  * This function is used to facilitate writing important information to disk
640  */
641 #define tofile(file_name, ...) do {\
642   char tmp_buf[512];\
643     int size;\
644     size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
645     if (0 > size)\
646       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
647                      "Failed to create tmp_buf\n");\
648     else\
649       tofile_(file_name,tmp_buf);\
650   } while (0);
651
652
653 /**
654  * Write the ids and their according index in the given array to a file
655  * Unused
656  */
657 /* static void
658 ids_to_file (char *file_name,
659              struct GNUNET_PeerIdentity *peer_ids,
660              unsigned int num_peer_ids)
661 {
662   unsigned int i;
663
664   for (i=0 ; i < num_peer_ids ; i++)
665   {
666     to_file (file_name,
667              "%u\t%s",
668              i,
669              GNUNET_i2s_full (&peer_ids[i]));
670   }
671 } */
672
673 /**
674  * Test the success of a single test
675  */
676 static int
677 evaluate (void)
678 {
679   unsigned int i;
680   int tmp_ok;
681
682   tmp_ok = 1;
683
684   for (i = 0; i < num_peers; i++)
685   {
686     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687         "%u. peer [%s] received %u of %u expected peer_ids: %i\n",
688         i,
689         GNUNET_i2s (rps_peers[i].peer_id),
690         rps_peers[i].num_recv_ids,
691         rps_peers[i].num_ids_to_request,
692         (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids));
693     tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids);
694   }
695   return tmp_ok? 0 : 1;
696 }
697
698
699 /**
700  * Creates an oplist entry and adds it to the oplist DLL
701  */
702 static struct OpListEntry *
703 make_oplist_entry ()
704 {
705   struct OpListEntry *entry;
706
707   entry = GNUNET_new (struct OpListEntry);
708   GNUNET_CONTAINER_DLL_insert_tail (oplist_head, oplist_tail, entry);
709   return entry;
710 }
711
712
713 /**
714  * @brief Checks if given peer already received its statistics value from the
715  * statistics service.
716  *
717  * @param rps_peer the peer to check for
718  *
719  * @return #GNUNET_YES if so
720  *         #GNUNET_NO otherwise
721  */
722 static int check_statistics_collect_completed_single_peer (
723     const struct RPSPeer *rps_peer)
724 {
725   if (cur_test_run.stat_collect_flags !=
726         (cur_test_run.stat_collect_flags &
727           rps_peer->stat_collected_flags))
728   {
729     return GNUNET_NO;
730   }
731   return GNUNET_YES;
732 }
733 /**
734  * @brief Checks if all peers already received their statistics value from the
735  * statistics service.
736  *
737  * @return #GNUNET_YES if so
738  *         #GNUNET_NO otherwise
739  */
740 static int check_statistics_collect_completed ()
741 {
742   uint32_t i;
743
744   for (i = 0; i < num_peers; i++)
745   {
746     if (GNUNET_NO == check_statistics_collect_completed_single_peer (&rps_peers[i]))
747     {
748       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
749           "At least Peer %" PRIu32 " did not yet receive all statistics values\n",
750           i);
751       return GNUNET_NO;
752     }
753   }
754   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
755       "All peers received their statistics values\n");
756   return GNUNET_YES;
757 }
758
759 /**
760  * Task run on timeout to shut everything down.
761  */
762 static void
763 shutdown_op (void *cls)
764 {
765   unsigned int i;
766
767   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
768               "Shutdown task scheduled, going down.\n");
769   in_shutdown = GNUNET_YES;
770   if (NULL != post_test_task)
771   {
772     GNUNET_SCHEDULER_cancel (post_test_task);
773   }
774   if (NULL != churn_task)
775   {
776     GNUNET_SCHEDULER_cancel (churn_task);
777     churn_task = NULL;
778   }
779   for (i = 0; i < num_peers; i++)
780   {
781     if (NULL != rps_peers[i].rps_handle)
782     {
783       GNUNET_RPS_disconnect (rps_peers[i].rps_handle);
784     }
785     if (NULL != rps_peers[i].op)
786     {
787       GNUNET_TESTBED_operation_done (rps_peers[i].op);
788     }
789   }
790 }
791
792
793 /**
794  * Task run on timeout to collect statistics and potentially shut down.
795  */
796 static void
797 post_test_op (void *cls)
798 {
799   unsigned int i;
800
801   post_test_task = NULL;
802   post_test = GNUNET_YES;
803   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
804               "Post test task scheduled, going down.\n");
805   if (NULL != churn_task)
806   {
807     GNUNET_SCHEDULER_cancel (churn_task);
808     churn_task = NULL;
809   }
810   for (i = 0; i < num_peers; i++)
811   {
812     if (NULL != rps_peers[i].op)
813     {
814       GNUNET_TESTBED_operation_done (rps_peers[i].op);
815       rps_peers[i].op = NULL;
816     }
817     if (NULL != cur_test_run.post_test)
818     {
819       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i);
820       cur_test_run.post_test (&rps_peers[i]);
821     }
822   }
823   /* If we do not collect statistics, shut down directly */
824   if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics ||
825       GNUNET_YES == check_statistics_collect_completed())
826   {
827     GNUNET_SCHEDULER_shutdown ();
828   }
829 }
830
831
832 /**
833  * Seed peers.
834  */
835 static void
836 seed_peers (void *cls)
837 {
838   struct RPSPeer *peer = cls;
839   unsigned int amount;
840   unsigned int i;
841
842   // TODO if malicious don't seed mal peers
843   amount = round (.5 * num_peers);
844
845   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding peers:\n");
846   for (i = 0 ; i < amount ; i++)
847     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
848                 i,
849                 GNUNET_i2s (&rps_peer_ids[i]));
850
851   GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids);
852 }
853
854
855 /**
856  * Get the id of peer i.
857  */
858   void
859 info_cb (void *cb_cls,
860          struct GNUNET_TESTBED_Operation *op,
861          const struct GNUNET_TESTBED_PeerInformation *pinfo,
862          const char *emsg)
863 {
864   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
865
866   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
867   {
868     return;
869   }
870
871   if (NULL == pinfo || NULL != emsg)
872   {
873     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
874     GNUNET_TESTBED_operation_done (entry->op);
875     return;
876   }
877
878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
879               "Peer %u is %s\n",
880               entry->index,
881               GNUNET_i2s (pinfo->result.id));
882
883   rps_peer_ids[entry->index] = *(pinfo->result.id);
884   rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
885
886   GNUNET_assert (GNUNET_OK ==
887       GNUNET_CONTAINER_multipeermap_put (peer_map,
888         &rps_peer_ids[entry->index],
889         &rps_peers[entry->index],
890         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
891   tofile ("/tmp/rps/peer_ids",
892            "%u\t%s\n",
893            entry->index,
894            GNUNET_i2s_full (&rps_peer_ids[entry->index]));
895
896   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
897   GNUNET_TESTBED_operation_done (entry->op);
898   GNUNET_free (entry);
899 }
900
901
902 /**
903  * Callback to be called when RPS service connect operation is completed
904  *
905  * @param cls the callback closure from functions generating an operation
906  * @param op the operation that has been finished
907  * @param ca_result the RPS service handle returned from rps_connect_adapter
908  * @param emsg error message in case the operation has failed; will be NULL if
909  *          operation has executed successfully.
910  */
911 static void
912 rps_connect_complete_cb (void *cls,
913                          struct GNUNET_TESTBED_Operation *op,
914                          void *ca_result,
915                          const char *emsg)
916 {
917   struct RPSPeer *rps_peer = cls;
918   struct GNUNET_RPS_Handle *rps = ca_result;
919
920   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
921   {
922     return;
923   }
924
925   rps_peer->rps_handle = rps;
926   rps_peer->online = GNUNET_YES;
927   num_peers_online++;
928
929   GNUNET_assert (op == rps_peer->op);
930   if (NULL != emsg)
931   {
932     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
933                 "Failed to connect to RPS service: %s\n",
934                 emsg);
935     ok = 1;
936     GNUNET_SCHEDULER_shutdown ();
937     return;
938   }
939
940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Started client successfully\n");
941
942   cur_test_run.main_test (rps_peer);
943 }
944
945
946 /**
947  * Adapter function called to establish a connection to
948  * the RPS service.
949  *
950  * @param cls closure
951  * @param cfg configuration of the peer to connect to; will be available until
952  *          GNUNET_TESTBED_operation_done() is called on the operation returned
953  *          from GNUNET_TESTBED_service_connect()
954  * @return service handle to return in 'op_result', NULL on error
955  */
956 static void *
957 rps_connect_adapter (void *cls,
958                                  const struct GNUNET_CONFIGURATION_Handle *cfg)
959 {
960   struct GNUNET_RPS_Handle *h;
961
962   h = GNUNET_RPS_connect (cfg);
963
964   if (NULL != cur_test_run.pre_test)
965     cur_test_run.pre_test (cls, h);
966
967   return h;
968 }
969
970 /**
971  * Called to open a connection to the peer's statistics
972  *
973  * @param cls peer context
974  * @param cfg configuration of the peer to connect to; will be available until
975  *          GNUNET_TESTBED_operation_done() is called on the operation returned
976  *          from GNUNET_TESTBED_service_connect()
977  * @return service handle to return in 'op_result', NULL on error
978  */
979 static void *
980 stat_connect_adapter (void *cls,
981                       const struct GNUNET_CONFIGURATION_Handle *cfg)
982 {
983   struct RPSPeer *peer = cls;
984
985   peer->stats_h = GNUNET_STATISTICS_create ("rps-profiler", cfg);
986   return peer->stats_h;
987 }
988
989 /**
990  * Called to disconnect from peer's statistics service
991  *
992  * @param cls peer context
993  * @param op_result service handle returned from the connect adapter
994  */
995 static void
996 stat_disconnect_adapter (void *cls, void *op_result)
997 {
998   struct RPSPeer *peer = cls;
999
1000   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
1001   //              (peer->stats_h, "core", "# peers connected",
1002   //               stat_iterator, peer));
1003   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
1004   //              (peer->stats_h, "nse", "# peers connected",
1005   //               stat_iterator, peer));
1006   GNUNET_STATISTICS_destroy (op_result, GNUNET_NO);
1007   peer->stats_h = NULL;
1008 }
1009
1010 /**
1011  * Called after successfully opening a connection to a peer's statistics
1012  * service; we register statistics monitoring for CORE and NSE here.
1013  *
1014  * @param cls the callback closure from functions generating an operation
1015  * @param op the operation that has been finished
1016  * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
1017  * @param emsg error message in case the operation has failed; will be NULL if
1018  *          operation has executed successfully.
1019  */
1020 static void
1021 stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
1022                   void *ca_result, const char *emsg )
1023 {
1024   //struct GNUNET_STATISTICS_Handle *sh = ca_result;
1025   //struct RPSPeer *peer = (struct RPSPeer *) cls;
1026
1027   if (NULL != emsg)
1028   {
1029     GNUNET_break (0);
1030     return;
1031   }
1032   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
1033   //              (sh, "core", "# peers connected",
1034   //               stat_iterator, peer));
1035   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
1036   //              (sh, "nse", "# peers connected",
1037   //               stat_iterator, peer));
1038 }
1039
1040
1041 /**
1042  * Adapter function called to destroy connection to
1043  * RPS service.
1044  *
1045  * @param cls closure
1046  * @param op_result service handle returned from the connect adapter
1047  */
1048 static void
1049 rps_disconnect_adapter (void *cls,
1050                                           void *op_result)
1051 {
1052   struct RPSPeer *peer = cls;
1053   struct GNUNET_RPS_Handle *h = op_result;
1054
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "disconnect_adapter()\n");
1056   GNUNET_assert (NULL != peer);
1057   GNUNET_RPS_disconnect (h);
1058   peer->rps_handle = NULL;
1059 }
1060
1061
1062 /***********************************************************************
1063  * Definition of tests
1064 ***********************************************************************/
1065
1066 /**
1067  * Callback to call on receipt of a reply
1068  *
1069  * @param cls closure
1070  * @param n number of peers
1071  * @param recv_peers the received peers
1072  */
1073 static void
1074 default_reply_handle (void *cls,
1075                       uint64_t n,
1076                       const struct GNUNET_PeerIdentity *recv_peers)
1077 {
1078   struct RPSPeer *rps_peer;
1079   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1080   unsigned int i;
1081
1082   rps_peer = pending_rep->rps_peer;
1083   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1084                                rps_peer->pending_rep_tail,
1085                                pending_rep);
1086   rps_peer->num_pending_reps--;
1087   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088               "[%s] got %" PRIu64 " peers:\n",
1089               GNUNET_i2s (rps_peer->peer_id),
1090               n);
1091
1092   for (i = 0; i < n; i++)
1093   {
1094     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                 "%u: %s\n",
1096                 i,
1097                 GNUNET_i2s (&recv_peers[i]));
1098
1099     rps_peer->num_recv_ids++;
1100   }
1101
1102   if (0 == evaluate () && HAVE_QUICK_QUIT == cur_test_run.have_quick_quit)
1103   {
1104     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test succeeded before timeout\n");
1105     GNUNET_assert (NULL != post_test_task);
1106     GNUNET_SCHEDULER_cancel (post_test_task);
1107     post_test_task = GNUNET_SCHEDULER_add_now (&post_test_op, NULL);
1108     GNUNET_assert (NULL!= post_test_task);
1109   }
1110 }
1111
1112 /**
1113  * Request random peers.
1114  */
1115 static void
1116 request_peers (void *cls)
1117 {
1118   struct PendingRequest *pending_req = cls;
1119   struct RPSPeer *rps_peer;
1120   struct PendingReply *pending_rep;
1121
1122   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1123     return;
1124   rps_peer = pending_req->rps_peer;
1125   GNUNET_assert (1 <= rps_peer->num_pending_reqs);
1126   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
1127                                rps_peer->pending_req_tail,
1128                                pending_req);
1129   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130               "Requesting one peer\n");
1131   pending_rep = GNUNET_new (struct PendingReply);
1132   pending_rep->rps_peer = rps_peer;
1133   pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
1134       1,
1135       cur_test_run.reply_handle,
1136       pending_rep);
1137   GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
1138                                     rps_peer->pending_rep_tail,
1139                                     pending_rep);
1140   rps_peer->num_pending_reps++;
1141   rps_peer->num_pending_reqs--;
1142 }
1143
1144 static void
1145 cancel_pending_req (struct PendingRequest *pending_req)
1146 {
1147   struct RPSPeer *rps_peer;
1148
1149   rps_peer = pending_req->rps_peer;
1150   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
1151                                rps_peer->pending_req_tail,
1152                                pending_req);
1153   rps_peer->num_pending_reqs--;
1154   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1155               "Cancelling pending request\n");
1156   GNUNET_SCHEDULER_cancel (pending_req->request_task);
1157   GNUNET_free (pending_req);
1158 }
1159
1160 static void
1161 cancel_request (struct PendingReply *pending_rep)
1162 {
1163   struct RPSPeer *rps_peer;
1164
1165   rps_peer = pending_rep->rps_peer;
1166   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1167                                rps_peer->pending_rep_tail,
1168                                pending_rep);
1169   rps_peer->num_pending_reps--;
1170   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1171               "Cancelling request\n");
1172   GNUNET_RPS_request_cancel (pending_rep->req_handle);
1173   GNUNET_free (pending_rep);
1174 }
1175
1176
1177 /**
1178  * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
1179  * issued, nor replied
1180  */
1181 void
1182 schedule_missing_requests (struct RPSPeer *rps_peer)
1183 {
1184   unsigned int i;
1185   struct PendingRequest *pending_req;
1186
1187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1188       "Scheduling %u - %u missing requests\n",
1189       rps_peer->num_ids_to_request,
1190       rps_peer->num_pending_reqs + rps_peer->num_pending_reps);
1191   GNUNET_assert (rps_peer->num_pending_reqs + rps_peer->num_pending_reps <=
1192       rps_peer->num_ids_to_request);
1193   for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
1194        i < rps_peer->num_ids_to_request; i++)
1195   {
1196     pending_req = GNUNET_new (struct PendingRequest);
1197     pending_req->rps_peer = rps_peer;
1198     pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
1199         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1200           cur_test_run.request_interval * i),
1201         request_peers,
1202         pending_req);
1203     GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
1204                                       rps_peer->pending_req_tail,
1205                                       pending_req);
1206     rps_peer->num_pending_reqs++;
1207   }
1208 }
1209
1210 void
1211 cancel_pending_req_rep (struct RPSPeer *rps_peer)
1212 {
1213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214       "Cancelling all (pending) requests.\n");
1215   while (NULL != rps_peer->pending_req_head)
1216     cancel_pending_req (rps_peer->pending_req_head);
1217   GNUNET_assert (0 == rps_peer->num_pending_reqs);
1218   while (NULL != rps_peer->pending_rep_head)
1219     cancel_request (rps_peer->pending_rep_head);
1220   GNUNET_assert (0 == rps_peer->num_pending_reps);
1221 }
1222
1223 /***********************************
1224  * MALICIOUS
1225 ***********************************/
1226
1227 /**
1228  * Initialise only non-mal RPSPeers
1229  */
1230 static void mal_init_peer (struct RPSPeer *rps_peer)
1231 {
1232   if (rps_peer->index >= round (portion * num_peers))
1233     rps_peer->num_ids_to_request = 1;
1234 }
1235
1236
1237 /**
1238  * @brief Set peers to (non-)malicious before execution
1239  *
1240  * Of signature #PreTest
1241  *
1242  * @param rps_peer the peer to set (non-) malicious
1243  * @param h the handle to the service
1244  */
1245 static void
1246 mal_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1247 {
1248   #ifdef ENABLE_MALICIOUS
1249   uint32_t num_mal_peers;
1250
1251   GNUNET_assert ( (1 >= portion) &&
1252                   (0 <  portion) );
1253   num_mal_peers = round (portion * num_peers);
1254
1255   if (rps_peer->index < num_mal_peers)
1256   {
1257     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                 "%u. peer [%s] of %" PRIu32 " malicious peers turning malicious\n",
1259                 rps_peer->index,
1260                 GNUNET_i2s (rps_peer->peer_id),
1261                 num_mal_peers);
1262
1263     GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
1264                               rps_peer_ids, target_peer);
1265   }
1266   #endif /* ENABLE_MALICIOUS */
1267 }
1268
1269 static void
1270 mal_cb (struct RPSPeer *rps_peer)
1271 {
1272   uint32_t num_mal_peers;
1273
1274   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1275   {
1276     return;
1277   }
1278
1279   #ifdef ENABLE_MALICIOUS
1280   GNUNET_assert ( (1 >= portion) &&
1281                   (0 <  portion) );
1282   num_mal_peers = round (portion * num_peers);
1283
1284   if (rps_peer->index >= num_mal_peers)
1285   { /* It's useless to ask a malicious peer about a random sample -
1286        it's not sampling */
1287     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1288                                   seed_peers, rps_peer);
1289     schedule_missing_requests (rps_peer);
1290   }
1291   #endif /* ENABLE_MALICIOUS */
1292 }
1293
1294 /***********************************
1295  * CHURN
1296 ***********************************/
1297
1298 static void
1299 churn (void *cls);
1300
1301 /**
1302  * @brief Starts churn
1303  *
1304  * Has signature of #MainTest
1305  *
1306  * This is not implemented too nicely as this is called for each peer, but we
1307  * only need to call it once. (Yes we check that we only schedule the task
1308  * once.)
1309  *
1310  * @param rps_peer The peer it's called for
1311  */
1312 static void
1313 churn_test_cb (struct RPSPeer *rps_peer)
1314 {
1315   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1316   {
1317     return;
1318   }
1319
1320   /* Start churn */
1321   if (HAVE_CHURN == cur_test_run.have_churn && NULL == churn_task)
1322   {
1323     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324                 "Starting churn task\n");
1325     churn_task = GNUNET_SCHEDULER_add_delayed (
1326           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1327           churn,
1328           NULL);
1329   } else {
1330     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331                 "Not starting churn task\n");
1332   }
1333
1334   schedule_missing_requests (rps_peer);
1335 }
1336
1337 /***********************************
1338  * PROFILER
1339 ***********************************/
1340
1341 /**
1342  * Callback to be called when RPS service is started or stopped at peers
1343  *
1344  * @param cls NULL
1345  * @param op the operation handle
1346  * @param emsg NULL on success; otherwise an error description
1347  */
1348 static void
1349 churn_cb (void *cls,
1350           struct GNUNET_TESTBED_Operation *op,
1351           const char *emsg)
1352 {
1353   // FIXME
1354   struct OpListEntry *entry = cls;
1355
1356   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1357   {
1358     return;
1359   }
1360
1361   GNUNET_TESTBED_operation_done (entry->op);
1362   if (NULL != emsg)
1363   {
1364     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
1365     GNUNET_SCHEDULER_shutdown ();
1366     return;
1367   }
1368   GNUNET_assert (0 != entry->delta);
1369
1370   num_peers_online += entry->delta;
1371
1372   if (PEER_GO_OFFLINE == entry->delta)
1373   { /* Peer hopefully just went offline */
1374     if (GNUNET_YES != rps_peers[entry->index].online)
1375     {
1376       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1377                   "peer %s was expected to go offline but is still marked as online\n",
1378                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1379       GNUNET_break (0);
1380     }
1381     else
1382     {
1383       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1384                   "peer %s probably went offline as expected\n",
1385                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1386     }
1387     rps_peers[entry->index].online = GNUNET_NO;
1388   }
1389
1390   else if (PEER_GO_ONLINE < entry->delta)
1391   { /* Peer hopefully just went online */
1392     if (GNUNET_NO != rps_peers[entry->index].online)
1393     {
1394       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1395                   "peer %s was expected to go online but is still marked as offline\n",
1396                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1397       GNUNET_break (0);
1398     }
1399     else
1400     {
1401       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1402                   "peer %s probably went online as expected\n",
1403                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1404       if (NULL != cur_test_run.pre_test)
1405       {
1406         cur_test_run.pre_test (&rps_peers[entry->index],
1407             rps_peers[entry->index].rps_handle);
1408         schedule_missing_requests (&rps_peers[entry->index]);
1409       }
1410     }
1411     rps_peers[entry->index].online = GNUNET_YES;
1412   }
1413   else
1414   {
1415     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1416         "Invalid value for delta: %i\n", entry->delta);
1417     GNUNET_break (0);
1418   }
1419
1420   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
1421   rps_peers[entry->index].entry_op_manage = NULL;
1422   GNUNET_free (entry);
1423   //if (num_peers_in_round[current_round] == peers_running)
1424   //  run_round ();
1425 }
1426
1427 /**
1428  * @brief Set the rps-service up or down for a specific peer
1429  *
1430  * @param i index of action
1431  * @param j index of peer
1432  * @param delta (#PEER_ONLINE_DELTA) down (-1) or up (1)
1433  * @param prob_go_on_off the probability of the action
1434  */
1435 static void
1436 manage_service_wrapper (unsigned int i, unsigned int j,
1437                         enum PEER_ONLINE_DELTA delta,
1438                         double prob_go_on_off)
1439 {
1440   struct OpListEntry *entry = NULL;
1441   uint32_t prob;
1442
1443   /* make sure that management operation is not already scheduled */
1444   if (NULL != rps_peers[j].entry_op_manage)
1445   {
1446     return;
1447   }
1448
1449   prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1450                                    UINT32_MAX);
1451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452               "%u. selected peer (%u: %s) is %s.\n",
1453               i,
1454               j,
1455               GNUNET_i2s (rps_peers[j].peer_id),
1456               (PEER_GO_ONLINE == delta) ? "online" : "offline");
1457   if (prob < prob_go_on_off * UINT32_MAX)
1458   {
1459     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1460                 "%s goes %s\n",
1461                 GNUNET_i2s (rps_peers[j].peer_id),
1462                 (PEER_GO_OFFLINE == delta) ? "offline" : "online");
1463
1464     if (PEER_GO_OFFLINE == delta)
1465       cancel_pending_req_rep (&rps_peers[j]);
1466     entry = make_oplist_entry ();
1467     entry->delta = delta;
1468     entry->index = j;
1469     entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
1470                                                     testbed_peers[j],
1471                                                     "rps",
1472                                                     &churn_cb,
1473                                                     entry,
1474                                                     (PEER_GO_OFFLINE == delta) ? 0 : 1);
1475     rps_peers[j].entry_op_manage = entry;
1476   }
1477 }
1478
1479
1480 static void
1481 churn (void *cls)
1482 {
1483   unsigned int i;
1484   unsigned int j;
1485   double portion_online;
1486   unsigned int *permut;
1487   double prob_go_offline;
1488   double portion_go_online;
1489   double portion_go_offline;
1490
1491   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1492   {
1493     return;
1494   }
1495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1496               "Churn function executing\n");
1497
1498   churn_task = NULL; /* Should be invalid by now */
1499
1500   /* Compute the probability for an online peer to go offline
1501    * this round */
1502   portion_online = num_peers_online * 1.0 / num_peers;
1503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1504               "Portion online: %f\n",
1505               portion_online);
1506   portion_go_online = ((1 - portion_online) * .5 * .66);
1507   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1508               "Portion that should go online: %f\n",
1509               portion_go_online);
1510   portion_go_offline = (portion_online + portion_go_online) - .75;
1511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512               "Portion that probably goes offline: %f\n",
1513               portion_go_offline);
1514   prob_go_offline = portion_go_offline / (portion_online * .5);
1515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516               "Probability of a selected online peer to go offline: %f\n",
1517               prob_go_offline);
1518
1519   permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK,
1520                                          (unsigned int) num_peers);
1521
1522   /* Go over 50% randomly chosen peers */
1523   for (i = 0; i < .5 * num_peers; i++)
1524   {
1525     j = permut[i];
1526
1527     /* If online, shut down with certain probability */
1528     if (GNUNET_YES == rps_peers[j].online)
1529     {
1530       manage_service_wrapper (i, j, -1, prob_go_offline);
1531     }
1532
1533     /* If offline, restart with certain probability */
1534     else if (GNUNET_NO == rps_peers[j].online)
1535     {
1536       manage_service_wrapper (i, j, 1, 0.66);
1537     }
1538   }
1539
1540   GNUNET_free (permut);
1541
1542   churn_task = GNUNET_SCHEDULER_add_delayed (
1543         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1544         churn,
1545         NULL);
1546 }
1547
1548
1549 /**
1550  * Initialise given RPSPeer
1551  */
1552 static void profiler_init_peer (struct RPSPeer *rps_peer)
1553 {
1554   if (num_peers - 1 == rps_peer->index)
1555   {
1556     rps_peer->num_ids_to_request = cur_test_run.num_requests;
1557   } else {
1558     rps_peer->num_ids_to_request = 0;
1559   }
1560   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer shall request %i peers\n",
1561               rps_peer->num_ids_to_request);
1562 }
1563
1564
1565 /**
1566  * Callback to call on receipt of a reply
1567  *
1568  * @param cls closure
1569  * @param n number of peers
1570  * @param recv_peers the received peers
1571  */
1572 static void
1573 profiler_reply_handle (void *cls,
1574                       uint64_t n,
1575                       const struct GNUNET_PeerIdentity *recv_peers)
1576 {
1577   struct RPSPeer *rps_peer;
1578   struct RPSPeer *rcv_rps_peer;
1579   char *file_name;
1580   char *file_name_dh;
1581   char *file_name_dhr;
1582   unsigned int i;
1583   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1584
1585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "profiler_reply_handle()\n");
1586   rps_peer = pending_rep->rps_peer;
1587   file_name = "/tmp/rps/received_ids";
1588   file_name_dh = "/tmp/rps/diehard_input";
1589   file_name_dhr = "/tmp/rps/diehard_input_raw";
1590   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1591               "[%s] got %" PRIu64 " peers:\n",
1592               GNUNET_i2s (rps_peer->peer_id),
1593               n);
1594   for (i = 0; i < n; i++)
1595   {
1596     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1597                 "%u: %s\n",
1598                 i,
1599                 GNUNET_i2s (&recv_peers[i]));
1600     tofile (file_name,
1601              "%s\n",
1602              GNUNET_i2s_full (&recv_peers[i]));
1603     rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
1604     GNUNET_assert (NULL != rcv_rps_peer);
1605     tofile (file_name_dh,
1606              "%" PRIu32 "\n",
1607              (uint32_t) rcv_rps_peer->index);
1608     to_file_raw (file_name_dhr,
1609                  &rcv_rps_peer->index,
1610                  sizeof (uint32_t));
1611   }
1612   default_reply_handle (cls, n, recv_peers);
1613 }
1614
1615
1616 static void
1617 profiler_cb (struct RPSPeer *rps_peer)
1618 {
1619   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1620   {
1621     return;
1622   }
1623
1624   /* Start churn */
1625   if (HAVE_CHURN == cur_test_run.have_churn && NULL == churn_task)
1626   {
1627     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1628                 "Starting churn task\n");
1629     churn_task = GNUNET_SCHEDULER_add_delayed (
1630           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1631           churn,
1632           NULL);
1633   } else {
1634     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635                 "Not starting churn task\n");
1636   }
1637
1638   /* Only request peer ids at one peer.
1639    * (It's the before-last because last one is target of the focussed attack.)
1640    */
1641   if (eval_peer == rps_peer)
1642     schedule_missing_requests (rps_peer);
1643 }
1644
1645 /**
1646  * Function called from #profiler_eval with a filename.
1647  *
1648  * @param cls closure
1649  * @param filename complete filename (absolute path)
1650  * @return #GNUNET_OK to continue to iterate,
1651  *  #GNUNET_NO to stop iteration with no error,
1652  *  #GNUNET_SYSERR to abort iteration with error!
1653  */
1654 int
1655 file_name_cb (void *cls, const char *filename)
1656 {
1657   if (NULL != strstr (filename, "sampler_el"))
1658   {
1659     struct RPS_SamplerElement *s_elem;
1660     struct GNUNET_CRYPTO_AuthKey auth_key;
1661     const char *key_char;
1662     uint32_t i;
1663
1664     key_char = filename + 20; /* Length of "/tmp/rps/sampler_el-" */
1665     tofile (filename, "--------------------------\n");
1666
1667     auth_key = string_to_auth_key (key_char);
1668     s_elem = RPS_sampler_elem_create ();
1669     RPS_sampler_elem_set (s_elem, auth_key);
1670
1671     for (i = 0; i < num_peers; i++)
1672     {
1673       RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
1674     }
1675     RPS_sampler_elem_destroy (s_elem);
1676   }
1677   return GNUNET_OK;
1678 }
1679
1680 /**
1681  * This is run after the test finished.
1682  *
1683  * Compute all perfect samples.
1684  */
1685 int
1686 profiler_eval (void)
1687 {
1688   /* Compute perfect sample for each sampler element */
1689   if (-1 == GNUNET_DISK_directory_scan ("/tmp/rps/", file_name_cb, NULL))
1690   {
1691     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n");
1692   }
1693
1694   return evaluate ();
1695 }
1696
1697 static uint32_t fac (uint32_t x)
1698 {
1699   if (1 >= x)
1700   {
1701     return x;
1702   }
1703   return x * fac (x - 1);
1704 }
1705
1706 static uint32_t binom (uint32_t n, uint32_t k)
1707 {
1708   //GNUNET_assert (n >= k);
1709   if (k > n) return 0;
1710   if (0 > n) return 0;
1711   if (0 > k) return 0;
1712   if (0 == k) return 1;
1713   return fac (n)
1714     /
1715     fac(k) * fac(n - k);
1716 }
1717
1718 /**
1719  * @brief is b in view of a?
1720  *
1721  * @param a
1722  * @param b
1723  *
1724  * @return
1725  */
1726 static int is_in_view (uint32_t a, uint32_t b)
1727 {
1728   uint32_t i;
1729   for (i = 0; i < rps_peers[a].cur_view_count; i++)
1730   {
1731     if (0 == memcmp (rps_peers[b].peer_id,
1732           &rps_peers[a].cur_view[i],
1733           sizeof (struct GNUNET_PeerIdentity)))
1734     {
1735       return GNUNET_YES;
1736     }
1737   }
1738   return GNUNET_NO;
1739 }
1740
1741 static uint32_t get_idx_of_pid (const struct GNUNET_PeerIdentity *pid)
1742 {
1743   uint32_t i;
1744
1745   for (i = 0; i < num_peers; i++)
1746   {
1747     if (0 == memcmp (pid,
1748           rps_peers[i].peer_id,
1749           sizeof (struct GNUNET_PeerIdentity)))
1750     {
1751       return i;
1752     }
1753   }
1754   //return 0; /* Should not happen - make compiler happy */
1755   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1756              "No known _PeerIdentity %s!\n",
1757              GNUNET_i2s_full (pid));
1758   GNUNET_assert (0);
1759 }
1760
1761 /**
1762  * @brief Counts number of peers in view of a that have b in their view
1763  *
1764  * @param a
1765  * @param uint32_tb
1766  *
1767  * @return
1768  */
1769 static uint32_t count_containing_views (uint32_t a, uint32_t b)
1770 {
1771   uint32_t i;
1772   uint32_t peer_idx;
1773   uint32_t count = 0;
1774
1775   for (i = 0; i < rps_peers[a].cur_view_count; i++)
1776   {
1777     peer_idx = get_idx_of_pid (&rps_peers[a].cur_view[i]);
1778     if (GNUNET_YES == is_in_view (peer_idx, b))
1779     {
1780       count++;
1781     }
1782   }
1783   return count;
1784 }
1785
1786 /**
1787  * @brief Computes the probability for each other peer to be selected by the
1788  * sampling process based on the views of all peers
1789  *
1790  * @param peer_idx index of the peer that is about to sample
1791  */
1792 static void compute_probabilities (uint32_t peer_idx)
1793 {
1794   //double probs[num_peers] = { 0 };
1795   double probs[num_peers];
1796   size_t probs_as_str_size = (num_peers * 10 + 1) * sizeof (char);
1797   char *probs_as_str = GNUNET_malloc (probs_as_str_size);
1798   char *probs_as_str_cpy;
1799   uint32_t i;
1800   double prob_push;
1801   double prob_pull;
1802   uint32_t view_size;
1803   uint32_t cont_views;
1804   uint32_t number_of_being_in_pull_events;
1805   int tmp;
1806   uint32_t count_non_zero_prob = 0;
1807
1808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809       "Computing probabilities for peer %" PRIu32 "\n", peer_idx);
1810   /* Firstly without knowledge of old views */
1811   for (i = 0; i < num_peers; i++)
1812   {
1813     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1814         "\tfor peer %" PRIu32 ":\n", i);
1815     view_size = rps_peers[i].cur_view_count;
1816     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1817         "\t\tview_size: %" PRIu32 "\n", view_size);
1818     /* For peer i the probability of being sampled is
1819      * evenly distributed among all possibly observed peers. */
1820     /* We could have observed a peer in three cases:
1821      *   1. peer sent a push
1822      *   2. peer was contained in a pull reply
1823      *   3. peer was in history (sampler) - ignored for now */
1824     /* 1. Probability of having received a push from peer i */
1825     if ((GNUNET_YES == is_in_view (i, peer_idx)) &&
1826         (1 <= (0.45 * view_size)))
1827     {
1828       prob_push = 1.0 * binom (0.45 * view_size, 1)
1829         /
1830         binom (view_size, 0.45 * view_size);
1831       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1832                  "\t\t%" PRIu32 " is in %" PRIu32 "'s view, prob: %f\n",
1833                  peer_idx,
1834                  i,
1835                  prob_push);
1836       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837                  "\t\tposs choices from view: %" PRIu32 ", containing i: %" PRIu32 "\n",
1838                  binom (view_size, 0.45 * view_size),
1839                  binom (0.45 * view_size, 1));
1840     } else {
1841       prob_push = 0;
1842       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1843                  "\t\t%" PRIu32 " is not in %" PRIu32 "'s view, prob: 0\n",
1844                  peer_idx,
1845                  i);
1846     }
1847     /* 2. Probability of peer i being contained in pulls */
1848     view_size = rps_peers[peer_idx].cur_view_count;
1849     cont_views = count_containing_views (peer_idx, i);
1850     number_of_being_in_pull_events =
1851       (binom (view_size, 0.45 * view_size) -
1852        binom (view_size - cont_views, 0.45 * view_size));
1853     if (0 != number_of_being_in_pull_events)
1854     {
1855       prob_pull = number_of_being_in_pull_events
1856         /
1857         (1.0 * binom (view_size, 0.45 * view_size));
1858     } else
1859     {
1860       prob_pull = 0;
1861     }
1862     probs[i] = prob_push + prob_pull - (prob_push * prob_pull);
1863     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1864                "\t\t%" PRIu32 " has %" PRIu32 " of %" PRIu32
1865                " peers in its view who know %" PRIu32 " prob: %f\n",
1866                peer_idx,
1867                cont_views,
1868                view_size,
1869                i,
1870                prob_pull);
1871     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1872                "\t\tnumber of possible pull combinations: %" PRIu32 "\n",
1873                binom (view_size, 0.45 * view_size));
1874     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875                "\t\tnumber of possible pull combinations without %" PRIu32
1876                ": %" PRIu32 "\n",
1877                i,
1878                binom (view_size - cont_views, 0.45 * view_size));
1879     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1880                "\t\tnumber of possible pull combinations with %" PRIu32
1881                ": %" PRIu32 "\n",
1882                i,
1883                number_of_being_in_pull_events);
1884
1885     if (0 != probs[i]) count_non_zero_prob++;
1886   }
1887   /* normalize */
1888   if (0 != count_non_zero_prob)
1889   {
1890     for (i = 0; i < num_peers; i++)
1891     {
1892       probs[i] = probs[i] * (1.0 / count_non_zero_prob);
1893     }
1894   } else {
1895     for (i = 0; i < num_peers; i++)
1896     {
1897       probs[i] = 0;
1898     }
1899   }
1900   /* str repr */
1901   for (i = 0; i < num_peers; i++)
1902   {
1903     probs_as_str_cpy = GNUNET_strndup (probs_as_str, probs_as_str_size);
1904     tmp = GNUNET_snprintf (probs_as_str,
1905                            probs_as_str_size,
1906                            "%s %7.6f", probs_as_str_cpy, probs[i]);
1907     GNUNET_free (probs_as_str_cpy);
1908     GNUNET_assert (0 <= tmp);
1909   }
1910
1911   to_file_w_len (rps_peers[peer_idx].file_name_probs,
1912                  probs_as_str_size,
1913                  probs_as_str);
1914   GNUNET_free (probs_as_str);
1915 }
1916
1917 /**
1918  * @brief This counts the number of peers in which views a given peer occurs.
1919  *
1920  * It also stores this value in the rps peer.
1921  *
1922  * @param peer_idx the index of the peer to count the representation
1923  *
1924  * @return the number of occurrences
1925  */
1926 static uint32_t count_peer_in_views_2 (uint32_t peer_idx)
1927 {
1928   uint32_t i, j;
1929   uint32_t count = 0;
1930
1931   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
1932   {
1933     for (j = 0; j < rps_peers[i].cur_view_count; j++) /* entry in view */
1934     {
1935       if (0 == memcmp (rps_peers[peer_idx].peer_id,
1936             &rps_peers[i].cur_view[j],
1937             sizeof (struct GNUNET_PeerIdentity)))
1938       {
1939         count++;
1940         break;
1941       }
1942     }
1943   }
1944   rps_peers[peer_idx].count_in_views = count;
1945   return count;
1946 }
1947
1948 static uint32_t cumulated_view_sizes ()
1949 {
1950   uint32_t i;
1951
1952   view_sizes = 0;
1953   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
1954   {
1955     view_sizes += rps_peers[i].cur_view_count;
1956   }
1957   return view_sizes;
1958 }
1959
1960 static void count_peer_in_views (uint32_t *count_peers)
1961 {
1962   uint32_t i, j;
1963
1964   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
1965   {
1966     for (j = 0; j < rps_peers[i].cur_view_count; j++) /* entry in view */
1967     {
1968       if (0 == memcmp (rps_peers[i].peer_id,
1969             &rps_peers[i].cur_view[j],
1970             sizeof (struct GNUNET_PeerIdentity)))
1971       {
1972         count_peers[i]++;
1973       }
1974     }
1975   }
1976 }
1977
1978 void compute_diversity ()
1979 {
1980   uint32_t i;
1981   /* ith entry represents the numer of occurrences in other peer's views */
1982   uint32_t *count_peers = GNUNET_new_array (num_peers, uint32_t);
1983   uint32_t views_total_size;
1984   double expected;
1985   /* deviation from expected number of peers */
1986   double *deviation = GNUNET_new_array (num_peers, double);
1987
1988   views_total_size = 0;
1989   expected = 0;
1990
1991   /* For each peer count its representation in other peer's views*/
1992   for (i = 0; i < num_peers; i++) /* Peer to count */
1993   {
1994     views_total_size += rps_peers[i].cur_view_count;
1995     count_peer_in_views (count_peers);
1996     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1997                "Counted representation of %" PRIu32 "th peer [%s]: %" PRIu32"\n",
1998                i,
1999                GNUNET_i2s (rps_peers[i].peer_id),
2000                count_peers[i]);
2001   }
2002
2003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2004              "size of all views combined: %" PRIu32 "\n",
2005              views_total_size);
2006   expected = ((double) 1/num_peers) * views_total_size;
2007   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2008              "Expected number of occurrences of each peer in all views: %f\n",
2009              expected);
2010   for (i = 0; i < num_peers; i++) /* Peer to count */
2011   {
2012     deviation[i] = expected - count_peers[i];
2013     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2014                "Deviation from expectation: %f\n", deviation[i]);
2015   }
2016   GNUNET_free (count_peers);
2017   GNUNET_free (deviation);
2018 }
2019
2020 void print_view_sizes()
2021 {
2022   uint32_t i;
2023
2024   for (i = 0; i < num_peers; i++) /* Peer to count */
2025   {
2026     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2027                "View size of %" PRIu32 ". [%s] is %" PRIu32 "\n",
2028                i,
2029                GNUNET_i2s (rps_peers[i].peer_id),
2030                rps_peers[i].cur_view_count);
2031   }
2032 }
2033
2034 void all_views_updated_cb()
2035 {
2036   compute_diversity();
2037   print_view_sizes();
2038 }
2039
2040 void view_update_cb (void *cls,
2041                      uint64_t view_size,
2042                      const struct GNUNET_PeerIdentity *peers)
2043 {
2044   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2045               "View was updated (%" PRIu64 ")\n", view_size);
2046   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
2047   to_file ("/tmp/rps/view_sizes.txt",
2048          "%" PRIu64 " %" PRIu32 "",
2049          rps_peer->index,
2050          view_size);
2051   for (int i = 0; i < view_size; i++)
2052   {
2053     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2054                "\t%s\n", GNUNET_i2s (&peers[i]));
2055   }
2056   GNUNET_array_grow (rps_peer->cur_view,
2057                      rps_peer->cur_view_count,
2058                      view_size);
2059   //*rps_peer->cur_view = *peers;
2060   GNUNET_memcpy (rps_peer->cur_view,
2061                  peers,
2062                  view_size * sizeof (struct GNUNET_PeerIdentity));
2063   to_file ("/tmp/rps/count_in_views.txt",
2064          "%" PRIu64 " %" PRIu32 "",
2065          rps_peer->index,
2066          count_peer_in_views_2 (rps_peer->index));
2067   cumulated_view_sizes();
2068   if (0 != view_size)
2069   {
2070     to_file ("/tmp/rps/repr.txt",
2071            "%" PRIu64 /* index */
2072            " %" PRIu32 /* occurrence in views */
2073            " %" PRIu32 /* view sizes */
2074            " %f" /* fraction of repr in views */
2075            " %f" /* average view size */
2076            " %f" /* prob of occurrence in view slot */
2077            " %f" "", /* exp frac of repr in views */
2078            rps_peer->index,
2079            count_peer_in_views_2 (rps_peer->index),
2080            view_sizes,
2081            count_peer_in_views_2 (rps_peer->index) / (view_size * 1.0), /* fraction of representation in views */
2082            view_sizes / (view_size * 1.0), /* average view size */
2083            1.0 /view_size, /* prob of occurrence in view slot */
2084            (1.0/view_size) * (view_sizes/view_size) /* expected fraction of repr in views */
2085            );
2086   }
2087   compute_probabilities (rps_peer->index);
2088   all_views_updated_cb();
2089 }
2090
2091 static void
2092 pre_profiler (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
2093 {
2094   rps_peer->file_name_probs =
2095     store_prefix_file_name (rps_peer->peer_id, "probs");
2096   GNUNET_RPS_view_request (h, 0, view_update_cb, rps_peer);
2097 }
2098
2099 void write_final_stats (void){
2100   uint32_t i;
2101
2102   for (i = 0; i < num_peers; i++)
2103   {
2104     to_file ("/tmp/rps/final_stats.dat",
2105              "%" PRIu32 " " /* index */
2106              "%s %" /* id */
2107              PRIu64 " %" /* rounds */
2108              PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" /* blocking */
2109              PRIu64 " %" PRIu64 " %" PRIu64 " %" /* issued */
2110              PRIu64 " %" PRIu64 " %" PRIu64 " %" /* sent */
2111              PRIu64 " %" PRIu64 " %" PRIu64 /* recv */,
2112              i,
2113              GNUNET_i2s (rps_peers[i].peer_id),
2114              rps_peers[i].num_rounds,
2115              rps_peers[i].num_blocks,
2116              rps_peers[i].num_blocks_many_push,
2117              rps_peers[i].num_blocks_no_push,
2118              rps_peers[i].num_blocks_no_pull,
2119              rps_peers[i].num_blocks_many_push_no_pull,
2120              rps_peers[i].num_blocks_no_push_no_pull,
2121              rps_peers[i].num_issued_push,
2122              rps_peers[i].num_issued_pull_req,
2123              rps_peers[i].num_issued_pull_rep,
2124              rps_peers[i].num_sent_push,
2125              rps_peers[i].num_sent_pull_req,
2126              rps_peers[i].num_sent_pull_rep,
2127              rps_peers[i].num_recv_push,
2128              rps_peers[i].num_recv_pull_req,
2129              rps_peers[i].num_recv_pull_rep);
2130   }
2131 }
2132
2133 /**
2134  * Continuation called by #GNUNET_STATISTICS_get() functions.
2135  *
2136  * Remembers that this specific statistics value was received for this peer.
2137  * Checks whether all peers received their statistics yet.
2138  * Issues the shutdown.
2139  *
2140  * @param cls closure
2141  * @param success #GNUNET_OK if statistics were
2142  *        successfully obtained, #GNUNET_SYSERR if not.
2143  */
2144 void
2145 post_test_shutdown_ready_cb (void *cls,
2146                              int success)
2147 {
2148   struct STATcls *stat_cls = (struct STATcls *) cls;
2149   struct RPSPeer *rps_peer = stat_cls->rps_peer;
2150   if (GNUNET_OK == success)
2151   {
2152     /* set flag that we we got the value */
2153     rps_peer->stat_collected_flags |= stat_cls->stat_type;
2154   } else {
2155     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2156         "Peer %u did not receive statistics value\n",
2157         rps_peer->index);
2158     GNUNET_free (stat_cls);
2159     GNUNET_break (0);
2160   }
2161
2162   if (NULL != rps_peer->stat_op &&
2163       GNUNET_YES == check_statistics_collect_completed_single_peer (rps_peer))
2164   {
2165     GNUNET_TESTBED_operation_done (rps_peer->stat_op);
2166   }
2167
2168   write_final_stats ();
2169   if (GNUNET_YES == check_statistics_collect_completed())
2170   {
2171     //write_final_stats ();
2172     GNUNET_free (stat_cls);
2173     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2174         "Shutting down\n");
2175     GNUNET_SCHEDULER_shutdown ();
2176   } else {
2177     GNUNET_free (stat_cls);
2178   }
2179 }
2180
2181 /**
2182  * @brief Converts string representation to the corresponding #STAT_TYPE enum.
2183  *
2184  * @param stat_str string representation of statistics specifier
2185  *
2186  * @return corresponding enum
2187  */
2188 enum STAT_TYPE stat_str_2_type (const char *stat_str)
2189 {
2190   if (0 == strncmp ("# rounds blocked - no pull replies", stat_str, strlen ("# rounds blocked - no pull replies")))
2191   {
2192     return STAT_TYPE_BLOCKS_NO_PULL;
2193   }
2194   else if (0 == strncmp ("# rounds blocked - too many pushes, no pull replies", stat_str, strlen ("# rounds blocked - too many pushes, no pull replies")))
2195   {
2196     return STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL;
2197   }
2198   else if (0 == strncmp ("# rounds blocked - too many pushes", stat_str, strlen ("# rounds blocked - too many pushes")))
2199   {
2200     return STAT_TYPE_BLOCKS_MANY_PUSH;
2201   }
2202   else if (0 == strncmp ("# rounds blocked - no pushes, no pull replies", stat_str, strlen ("# rounds blocked - no pushes, no pull replies")))
2203   {
2204     return STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL;
2205   }
2206   else if (0 == strncmp ("# rounds blocked - no pushes", stat_str, strlen ("# rounds blocked - no pushes")))
2207   {
2208     return STAT_TYPE_BLOCKS_NO_PUSH;
2209   }
2210   else if (0 == strncmp ("# rounds blocked", stat_str, strlen ("# rounds blocked")))
2211   {
2212     return STAT_TYPE_BLOCKS;
2213   }
2214   else if (0 == strncmp ("# rounds", stat_str, strlen ("# rounds")))
2215   {
2216     return STAT_TYPE_ROUNDS;
2217   }
2218   else if (0 == strncmp ("# push send issued", stat_str, strlen ("# push send issued")))
2219   {
2220     return STAT_TYPE_ISSUED_PUSH_SEND;
2221   }
2222   else if (0 == strncmp ("# pull request send issued", stat_str, strlen ("# pull request send issued")))
2223   {
2224     return STAT_TYPE_ISSUED_PULL_REQ;
2225   }
2226   else if (0 == strncmp ("# pull reply send issued", stat_str, strlen ("# pull reply send issued")))
2227   {
2228     return STAT_TYPE_ISSUED_PULL_REP;
2229   }
2230   else if (0 == strncmp ("# pushes sent", stat_str, strlen ("# pushes sent")))
2231   {
2232     return STAT_TYPE_SENT_PUSH_SEND;
2233   }
2234   else if (0 == strncmp ("# pull requests sent", stat_str, strlen ("# pull requests sent")))
2235   {
2236     return STAT_TYPE_SENT_PULL_REQ;
2237   }
2238   else if (0 == strncmp ("# pull replys sent", stat_str, strlen ("# pull replys sent")))
2239   {
2240     return STAT_TYPE_SENT_PULL_REP;
2241   }
2242   else if (0 == strncmp ("# push message received", stat_str, strlen ("# push message received")))
2243   {
2244     return STAT_TYPE_RECV_PUSH_SEND;
2245   }
2246   else if (0 == strncmp ("# pull request message received", stat_str, strlen ("# pull request message received")))
2247   {
2248     return STAT_TYPE_RECV_PULL_REQ;
2249   }
2250   else if (0 == strncmp ("# pull reply messages received", stat_str, strlen ("# pull reply messages received")))
2251   {
2252     return STAT_TYPE_RECV_PULL_REP;
2253   }
2254   return STAT_TYPE_MAX;
2255 }
2256
2257
2258 /**
2259  * @brief Converts #STAT_TYPE enum to the equivalent string representation that
2260  * is stored with the statistics service.
2261  *
2262  * @param stat_type #STAT_TYPE enum
2263  *
2264  * @return string representation that matches statistics value
2265  */
2266 char* stat_type_2_str (enum STAT_TYPE stat_type)
2267 {
2268   switch (stat_type)
2269   {
2270     case STAT_TYPE_ROUNDS:
2271       return "# rounds";
2272     case STAT_TYPE_BLOCKS:
2273       return "# rounds blocked";
2274     case STAT_TYPE_BLOCKS_MANY_PUSH:
2275       return "# rounds blocked - too many pushes";
2276     case STAT_TYPE_BLOCKS_NO_PUSH:
2277       return "# rounds blocked - no pushes";
2278     case STAT_TYPE_BLOCKS_NO_PULL:
2279       return "# rounds blocked - no pull replies";
2280     case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
2281       return "# rounds blocked - too many pushes, no pull replies";
2282     case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
2283       return "# rounds blocked - no pushes, no pull replies";
2284     case STAT_TYPE_ISSUED_PUSH_SEND:
2285       return "# push send issued";
2286     case STAT_TYPE_ISSUED_PULL_REQ:
2287       return "# pull request send issued";
2288     case STAT_TYPE_ISSUED_PULL_REP:
2289       return "# pull reply send issued";
2290     case STAT_TYPE_SENT_PUSH_SEND:
2291       return "# pushes sent";
2292     case STAT_TYPE_SENT_PULL_REQ:
2293       return "# pull requests sent";
2294     case STAT_TYPE_SENT_PULL_REP:
2295       return "# pull replys sent";
2296     case STAT_TYPE_RECV_PUSH_SEND:
2297       return "# push message received";
2298     case STAT_TYPE_RECV_PULL_REQ:
2299       return "# pull request message received";
2300     case STAT_TYPE_RECV_PULL_REP:
2301       return "# pull reply messages received";
2302     case STAT_TYPE_MAX:
2303     default:
2304       return "ERROR";
2305       ;
2306   }
2307 }
2308
2309 /**
2310  * Callback function to process statistic values.
2311  *
2312  * @param cls closure
2313  * @param subsystem name of subsystem that created the statistic
2314  * @param name the name of the datum
2315  * @param value the current value
2316  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
2317  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
2318  */
2319 int
2320 stat_iterator (void *cls,
2321                const char *subsystem,
2322                const char *name,
2323                uint64_t value,
2324                int is_persistent)
2325 {
2326   const struct STATcls *stat_cls = (const struct STATcls *) cls;
2327   struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
2328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
2329       //stat_type_2_str (stat_cls->stat_type),
2330       name,
2331       value);
2332   to_file (rps_peer->file_name_stats,
2333           "%s: %" PRIu64 "\n",
2334           name,
2335           value);
2336   switch (stat_str_2_type (name))
2337   {
2338     case STAT_TYPE_ROUNDS:
2339       rps_peer->num_rounds = value;
2340       break;
2341     case STAT_TYPE_BLOCKS:
2342       rps_peer->num_blocks = value;
2343       break;
2344     case STAT_TYPE_BLOCKS_MANY_PUSH:
2345       rps_peer->num_blocks_many_push = value;
2346       break;
2347     case STAT_TYPE_BLOCKS_NO_PUSH:
2348       rps_peer->num_blocks_no_push = value;
2349       break;
2350     case STAT_TYPE_BLOCKS_NO_PULL:
2351       rps_peer->num_blocks_no_pull = value;
2352       break;
2353     case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
2354       rps_peer->num_blocks_many_push_no_pull = value;
2355       break;
2356     case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
2357       rps_peer->num_blocks_no_push_no_pull = value;
2358       break;
2359     case STAT_TYPE_ISSUED_PUSH_SEND:
2360       rps_peer->num_issued_push = value;
2361       break;
2362     case STAT_TYPE_ISSUED_PULL_REQ:
2363       rps_peer->num_issued_pull_req = value;
2364       break;
2365     case STAT_TYPE_ISSUED_PULL_REP:
2366       rps_peer->num_issued_pull_rep = value;
2367       break;
2368     case STAT_TYPE_SENT_PUSH_SEND:
2369       rps_peer->num_sent_push = value;
2370       break;
2371     case STAT_TYPE_SENT_PULL_REQ:
2372       rps_peer->num_sent_pull_req = value;
2373       break;
2374     case STAT_TYPE_SENT_PULL_REP:
2375       rps_peer->num_sent_pull_rep = value;
2376       break;
2377     case STAT_TYPE_RECV_PUSH_SEND:
2378       rps_peer->num_recv_push = value;
2379       break;
2380     case STAT_TYPE_RECV_PULL_REQ:
2381       rps_peer->num_recv_pull_req = value;
2382       break;
2383     case STAT_TYPE_RECV_PULL_REP:
2384       rps_peer->num_recv_pull_rep = value;
2385       break;
2386     case STAT_TYPE_MAX:
2387     default:
2388       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2389                  "Unknown statistics string: %s\n",
2390                  name);
2391       break;
2392   }
2393   return GNUNET_OK;
2394 }
2395
2396 void post_profiler (struct RPSPeer *rps_peer)
2397 {
2398   if (COLLECT_STATISTICS != cur_test_run.have_collect_statistics)
2399   {
2400     return;
2401   }
2402
2403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2404       "Going to request statistic values with mask 0x%" PRIx32 "\n",
2405       cur_test_run.stat_collect_flags);
2406
2407   struct STATcls *stat_cls;
2408   uint32_t stat_type;
2409   for (stat_type = STAT_TYPE_ROUNDS;
2410       stat_type < STAT_TYPE_MAX;
2411       stat_type = stat_type <<1)
2412   {
2413     if (stat_type & cur_test_run.stat_collect_flags)
2414     {
2415       stat_cls = GNUNET_malloc (sizeof (struct STATcls));
2416       stat_cls->rps_peer = rps_peer;
2417       stat_cls->stat_type = stat_type;
2418       rps_peer->file_name_stats =
2419         store_prefix_file_name (rps_peer->peer_id, "stats");
2420       GNUNET_STATISTICS_get (rps_peer->stats_h,
2421                              "rps",
2422                              stat_type_2_str (stat_type),
2423                              post_test_shutdown_ready_cb,
2424                              stat_iterator,
2425                              (struct STATcls *) stat_cls);
2426       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2427           "Requested statistics for %s (peer %" PRIu32 ")\n",
2428           stat_type_2_str (stat_type),
2429           rps_peer->index);
2430     }
2431   }
2432 }
2433
2434
2435 /***********************************************************************
2436  * /Definition of tests
2437 ***********************************************************************/
2438
2439
2440 /**
2441  * Actual "main" function for the testcase.
2442  *
2443  * @param cls closure
2444  * @param h the run handle
2445  * @param n_peers number of peers in 'peers'
2446  * @param peers handle to peers run in the testbed
2447  * @param links_succeeded the number of overlay link connection attempts that
2448  *          succeeded
2449  * @param links_failed the number of overlay link connection attempts that
2450  *          failed
2451  */
2452 static void
2453 test_run (void *cls,
2454      struct GNUNET_TESTBED_RunHandle *h,
2455      unsigned int n_peers,
2456      struct GNUNET_TESTBED_Peer **peers,
2457      unsigned int links_succeeded,
2458      unsigned int links_failed)
2459 {
2460   unsigned int i;
2461   struct OpListEntry *entry;
2462
2463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "RUN was called\n");
2464   printf ("test 1\n");
2465
2466   /* Check whether we timed out */
2467   if (n_peers != num_peers ||
2468       NULL == peers ||
2469       0 == links_succeeded)
2470   {
2471     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Going down due to args (eg. timeout)\n");
2472     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tn_peers: %u\n", n_peers);
2473     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tnum_peers: %" PRIu32 "\n", num_peers);
2474     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tpeers: %p\n", peers);
2475     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tlinks_succeeded: %u\n", links_succeeded);
2476     GNUNET_SCHEDULER_shutdown ();
2477     return;
2478   }
2479
2480
2481   /* Initialize peers */
2482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "going to initialise peers\n");
2483   testbed_peers = peers;
2484   num_peers_online = 0;
2485   for (i = 0; i < num_peers; i++)
2486   {
2487     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "initialising %u\n", i);
2488     entry = make_oplist_entry ();
2489     entry->index = i;
2490     rps_peers[i].index = i;
2491     if (NULL != cur_test_run.init_peer)
2492       cur_test_run.init_peer (&rps_peers[i]);
2493     if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
2494     {
2495       rps_peers->cur_view_count = 0;
2496       rps_peers->cur_view = NULL;
2497     }
2498     entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
2499                                                      GNUNET_TESTBED_PIT_IDENTITY,
2500                                                      &info_cb,
2501                                                      entry);
2502   }
2503
2504   /* Bring peers up */
2505   GNUNET_assert (num_peers == n_peers);
2506   for (i = 0; i < n_peers; i++)
2507   {
2508     rps_peers[i].index = i;
2509     rps_peers[i].op =
2510       GNUNET_TESTBED_service_connect (&rps_peers[i],
2511                                       peers[i],
2512                                       "rps",
2513                                       &rps_connect_complete_cb,
2514                                       &rps_peers[i],
2515                                       &rps_connect_adapter,
2516                                       &rps_disconnect_adapter,
2517                                       &rps_peers[i]);
2518     /* Connect all peers to statistics service */
2519     if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
2520     {
2521       rps_peers[i].stat_op =
2522         GNUNET_TESTBED_service_connect (NULL,
2523                                         peers[i],
2524                                         "statistics",
2525                                         stat_complete_cb,
2526                                         &rps_peers[i],
2527                                         &stat_connect_adapter,
2528                                         &stat_disconnect_adapter,
2529                                         &rps_peers[i]);
2530     }
2531   }
2532
2533   if (NULL != churn_task)
2534     GNUNET_SCHEDULER_cancel (churn_task);
2535   post_test_task = GNUNET_SCHEDULER_add_delayed (timeout, &post_test_op, NULL);
2536   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
2537       (timeout_s * 1.2) + 0.1 * num_peers);
2538   shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
2539   shutdown_task = GNUNET_SCHEDULER_add_shutdown (shutdown_op, NULL);
2540
2541 }
2542
2543
2544 /**
2545  * Entry point for the testcase, sets up the testbed.
2546  *
2547  * @param argc unused
2548  * @param argv unused
2549  * @return 0 on success
2550  */
2551 static void
2552 run (void *cls,
2553     char *const *args,
2554     const char *cfgfile,
2555     const struct GNUNET_CONFIGURATION_Handle *cfg)
2556 {
2557   //int ret_value;
2558
2559   /* Defaults for tests */
2560   churn_task = NULL;
2561
2562   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
2563   cur_test_run.name = "test-rps-profiler";
2564   num_peers = 10;
2565   mal_type = 3;
2566   cur_test_run.init_peer = profiler_init_peer;
2567   //cur_test_run.pre_test = mal_pre;
2568   cur_test_run.pre_test = pre_profiler;
2569   cur_test_run.main_test = profiler_cb;
2570   cur_test_run.reply_handle = profiler_reply_handle;
2571   cur_test_run.eval_cb = profiler_eval;
2572   cur_test_run.post_test = post_profiler;
2573   cur_test_run.request_interval = 2;
2574   cur_test_run.num_requests = 5;
2575   //cur_test_run.have_churn = HAVE_CHURN;
2576   cur_test_run.have_churn = HAVE_NO_CHURN;
2577   cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
2578   cur_test_run.have_collect_statistics = COLLECT_STATISTICS;
2579   cur_test_run.stat_collect_flags = STAT_TYPE_ROUNDS |
2580                                     STAT_TYPE_BLOCKS |
2581                                     STAT_TYPE_BLOCKS_MANY_PUSH |
2582                                     STAT_TYPE_BLOCKS_NO_PUSH |
2583                                     STAT_TYPE_BLOCKS_NO_PULL |
2584                                     STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL |
2585                                     STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL |
2586                                     STAT_TYPE_ISSUED_PUSH_SEND |
2587                                     STAT_TYPE_ISSUED_PULL_REQ |
2588                                     STAT_TYPE_ISSUED_PULL_REP |
2589                                     STAT_TYPE_SENT_PUSH_SEND |
2590                                     STAT_TYPE_SENT_PULL_REQ |
2591                                     STAT_TYPE_SENT_PULL_REP |
2592                                     STAT_TYPE_RECV_PUSH_SEND |
2593                                     STAT_TYPE_RECV_PULL_REQ |
2594                                     STAT_TYPE_RECV_PULL_REP;
2595   cur_test_run.have_collect_view = COLLECT_VIEW;
2596   timeout_s = 300;
2597
2598   /* 'Clean' directory */
2599   (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
2600   GNUNET_DISK_directory_create ("/tmp/rps/");
2601   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, timeout_s);
2602
2603   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
2604   peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
2605   rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
2606   if ( (2 == mal_type) ||
2607        (3 == mal_type))
2608     target_peer = &rps_peer_ids[num_peers - 2];
2609   if (profiler_eval == cur_test_run.eval_cb)
2610     eval_peer = &rps_peers[num_peers - 1];    /* FIXME: eval_peer could be a
2611                                                  malicious peer if not careful
2612                                                  with the malicious portion */
2613
2614   ok = 1;
2615   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2616               "before _run()\n");
2617   //ret_value = GNUNET_TESTBED_test_run (cur_test_run.name,
2618   //                                     "test_rps.conf",
2619   //                                     num_peers,
2620   //                                     0, NULL, NULL,
2621   //                                     &test_run, NULL);
2622   GNUNET_TESTBED_run (NULL,
2623                       cfg,
2624                       num_peers,
2625                       0, /* event mask */
2626                       NULL,
2627                       NULL,
2628                       &test_run,
2629                       NULL);
2630   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2631               "after _run()\n");
2632   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2633               "gnunet-rps-profiler returned.\n");
2634 }
2635
2636 /**
2637  * Entry point for the testcase, sets up the testbed.
2638  *
2639  * @param argc unused
2640  * @param argv unused
2641  * @return 0 on success
2642  */
2643 int
2644 main (int argc, char *argv[])
2645 {
2646   int ret_value;
2647   struct GNUNET_GETOPT_CommandLineOption options[] = {
2648     GNUNET_GETOPT_option_uint ('n',
2649                                "peers",
2650                                "COUNT",
2651                                gettext_noop ("number of peers to start"),
2652                                &num_peers),
2653     GNUNET_GETOPT_OPTION_END
2654   };
2655
2656   //if (GNUNET_OK !=
2657   //    GNUNET_STRINGS_get_utf8_args (argc, argv,
2658   //                                  &argc, &argv))
2659   //  return 2;
2660   ret_value = 0;
2661   if (GNUNET_OK !=
2662       GNUNET_PROGRAM_run (argc,
2663                           argv,
2664                           "gnunet-rps-profiler",
2665                           gettext_noop ("Measure quality and performance of the RPS service."),
2666                           options,
2667                           &run,
2668                           NULL))
2669   {
2670     ret_value = 1;
2671   }
2672   if (GNUNET_OK != ret_value)
2673   {
2674     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2675                 "Test did not run successfully!\n");
2676   }
2677
2678   ret_value = cur_test_run.eval_cb();
2679   if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
2680   {
2681     GNUNET_array_grow (rps_peers->cur_view,
2682                        rps_peers->cur_view_count,
2683                        0);
2684   }
2685   GNUNET_free (rps_peers);
2686   GNUNET_free (rps_peer_ids);
2687   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
2688   printf ("test -1\n");
2689   return ret_value;
2690 }
2691
2692 /* end of test_rps.c */