63a6007aed3724184714fc4433123dbd41c72acb
[oweals/gnunet.git] / src / rps / test_rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2012 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file rps/test_rps.c
20  * @brief Testcase for the random peer sampling service.  Starts
21  *        a peergroup with a given number of peers, then waits to
22  *        receive size pushes/pulls from each peer.  Expects to wait
23  *        for one message from each peer.
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_testbed_service.h"
28
29 #include "gnunet_rps_service.h"
30 #include "rps-test_util.h"
31 #include "gnunet-service-rps_sampler_elem.h"
32
33 #include <inttypes.h>
34
35
36 /**
37  * How many peers do we start?
38  */
39 static uint32_t num_peers;
40
41 /**
42  * How long do we run the test?
43  * In seconds.
44  */
45 static uint32_t timeout_s;
46
47 /**
48  * How long do we run the test?
49  */
50 //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
51 static struct GNUNET_TIME_Relative timeout;
52
53
54 /**
55  * Portion of malicious peers
56  */
57 static double portion = .1;
58
59 /**
60  * Type of malicious peer to test
61  */
62 static unsigned int mal_type = 0;
63
64 /**
65  * Handles to all of the running peers
66  */
67 static struct GNUNET_TESTBED_Peer **testbed_peers;
68
69 /**
70  * @brief Indicates whether peer should go off- or online
71  */
72 enum PEER_ONLINE_DELTA {
73   /**
74    * @brief Indicates peer going online
75    */
76   PEER_GO_ONLINE = 1,
77   /**
78    * @brief Indicates peer going offline
79    */
80   PEER_GO_OFFLINE = -1,
81 };
82
83 /**
84  * Operation map entry
85  */
86 struct OpListEntry
87 {
88   /**
89    * DLL next ptr
90    */
91   struct OpListEntry *next;
92
93   /**
94    * DLL prev ptr
95    */
96   struct OpListEntry *prev;
97
98   /**
99    * The testbed operation
100    */
101   struct GNUNET_TESTBED_Operation *op;
102
103   /**
104    * Depending on whether we start or stop RPS service at the peer, set this to
105    * #PEER_GO_ONLINE (1) or #PEER_GO_OFFLINE (-1)
106    */
107   enum PEER_ONLINE_DELTA delta;
108
109   /**
110    * Index of the regarding peer
111    */
112   unsigned int index;
113 };
114
115 /**
116  * OpList DLL head
117  */
118 static struct OpListEntry *oplist_head;
119
120 /**
121  * OpList DLL tail
122  */
123 static struct OpListEntry *oplist_tail;
124
125
126 /**
127  * A pending reply: A request was sent and the reply is pending.
128  */
129 struct PendingReply
130 {
131   /**
132    * DLL next,prev ptr
133    */
134   struct PendingReply *next;
135   struct PendingReply *prev;
136
137   /**
138    * Handle to the request we are waiting for
139    */
140   struct GNUNET_RPS_Request_Handle *req_handle;
141
142   /**
143    * The peer that requested
144    */
145   struct RPSPeer *rps_peer;
146 };
147
148
149 /**
150  * A pending request: A request was not made yet but is scheduled for later.
151  */
152 struct PendingRequest
153 {
154   /**
155    * DLL next,prev ptr
156    */
157   struct PendingRequest *next;
158   struct PendingRequest *prev;
159
160   /**
161    * Handle to the request we are waiting for
162    */
163   struct GNUNET_SCHEDULER_Task *request_task;
164
165   /**
166    * The peer that requested
167    */
168   struct RPSPeer *rps_peer;
169 };
170
171
172 /**
173  * Information we track for each peer.
174  */
175 struct RPSPeer
176 {
177   /**
178    * Index of the peer.
179    */
180   unsigned int index;
181
182   /**
183    * Handle for RPS connect operation.
184    */
185   struct GNUNET_TESTBED_Operation *op;
186
187   /**
188    * Handle to RPS service.
189    */
190   struct GNUNET_RPS_Handle *rps_handle;
191
192   /**
193    * Handle to stream requests
194    */
195   struct GNUNET_RPS_StreamRequestHandle *rps_srh;
196
197   /**
198    * ID of the peer.
199    */
200   struct GNUNET_PeerIdentity *peer_id;
201
202   /**
203    * A request handle to check for an request
204    */
205   //struct GNUNET_RPS_Request_Handle *req_handle;
206
207   /**
208    * Peer on- or offline?
209    */
210   int online;
211
212   /**
213    * Number of Peer IDs to request during the whole test
214    */
215   unsigned int num_ids_to_request;
216
217   /**
218    * Pending requests DLL
219    */
220   struct PendingRequest *pending_req_head;
221   struct PendingRequest *pending_req_tail;
222
223   /**
224    * Number of pending requests
225    */
226   unsigned int num_pending_reqs;
227
228   /**
229    * Pending replies DLL
230    */
231   struct PendingReply *pending_rep_head;
232   struct PendingReply *pending_rep_tail;
233
234   /**
235    * Number of pending replies
236    */
237   unsigned int num_pending_reps;
238
239   /**
240    * Number of received PeerIDs
241    */
242   unsigned int num_recv_ids;
243
244   /**
245    * Pending operation on that peer
246    */
247   const struct OpListEntry *entry_op_manage;
248
249   /**
250    * Testbed operation to connect to statistics service
251    */
252   struct GNUNET_TESTBED_Operation *stat_op;
253
254   /**
255    * Handle to the statistics service
256    */
257   struct GNUNET_STATISTICS_Handle *stats_h;
258
259   /**
260    * @brief flags to indicate which statistics values have been already
261    * collected from the statistics service.
262    * Used to check whether we are able to shutdown.
263    */
264   uint32_t stat_collected_flags;
265
266   /**
267    * @brief File name of the file the stats are finally written to
268    */
269   const char *file_name_stats;
270
271   /**
272    * @brief File name of the file the stats are finally written to
273    */
274   const char *file_name_probs;
275
276   /**
277    * @brief The current view
278    */
279   struct GNUNET_PeerIdentity *cur_view;
280
281   /**
282    * @brief Number of peers in the #cur_view.
283    */
284   uint32_t cur_view_count;
285
286   /**
287    * @brief Number of occurrences in other peer's view
288    */
289   uint32_t count_in_views;
290
291   /**
292    * @brief statistics values
293    */
294   uint64_t num_rounds;
295   uint64_t num_blocks;
296   uint64_t num_blocks_many_push;
297   uint64_t num_blocks_no_push;
298   uint64_t num_blocks_no_pull;
299   uint64_t num_blocks_many_push_no_pull;
300   uint64_t num_blocks_no_push_no_pull;
301   uint64_t num_issued_push;
302   uint64_t num_issued_pull_req;
303   uint64_t num_issued_pull_rep;
304   uint64_t num_sent_push;
305   uint64_t num_sent_pull_req;
306   uint64_t num_sent_pull_rep;
307   uint64_t num_recv_push;
308   uint64_t num_recv_pull_req;
309   uint64_t num_recv_pull_rep;
310 };
311
312 enum STAT_TYPE
313 {
314   STAT_TYPE_ROUNDS                    =    0x1, /*   1 */
315   STAT_TYPE_BLOCKS                    =    0x2, /*   2 */
316   STAT_TYPE_BLOCKS_MANY_PUSH          =    0x4, /*   3 */
317   STAT_TYPE_BLOCKS_NO_PUSH            =    0x8, /*   4 */
318   STAT_TYPE_BLOCKS_NO_PULL            =   0x10, /*   5 */
319   STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL  =   0x20, /*   6 */
320   STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL    =   0x40, /*   7 */
321   STAT_TYPE_ISSUED_PUSH_SEND          =   0x80, /*   8 */
322   STAT_TYPE_ISSUED_PULL_REQ           =  0x100, /*   9 */
323   STAT_TYPE_ISSUED_PULL_REP           =  0x200, /*  10 */
324   STAT_TYPE_SENT_PUSH_SEND            =  0x400, /*  11 */
325   STAT_TYPE_SENT_PULL_REQ             =  0x800, /*  12 */
326   STAT_TYPE_SENT_PULL_REP             = 0x1000, /*  13 */
327   STAT_TYPE_RECV_PUSH_SEND            = 0x2000, /*  14 */
328   STAT_TYPE_RECV_PULL_REQ             = 0x4000, /*  15 */
329   STAT_TYPE_RECV_PULL_REP             = 0x8000, /*  16 */
330   STAT_TYPE_MAX          = 0x80000000, /*  32 */
331 };
332
333 struct STATcls
334 {
335   struct RPSPeer *rps_peer;
336   enum STAT_TYPE stat_type;
337 };
338
339
340 /**
341  * Information for all the peers.
342  */
343 static struct RPSPeer *rps_peers;
344
345 /**
346  * Peermap to get the index of a given peer ID quick.
347  */
348 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
349
350 /**
351  * IDs of the peers.
352  */
353 static struct GNUNET_PeerIdentity *rps_peer_ids;
354
355 /**
356  * ID of the targeted peer.
357  */
358 static struct GNUNET_PeerIdentity *target_peer;
359
360 /**
361  * ID of the peer that requests for the evaluation.
362  */
363 static struct RPSPeer *eval_peer;
364
365 /**
366  * Number of online peers.
367  */
368 static unsigned int num_peers_online;
369
370 /**
371  * @brief The added sizes of the peer's views
372  */
373 static unsigned int view_sizes;
374
375 /**
376  * Return value from 'main'.
377  */
378 static int ok;
379
380 /**
381  * Identifier for the churn task that runs periodically
382  */
383 static struct GNUNET_SCHEDULER_Task *post_test_task;
384
385 /**
386  * Identifier for the churn task that runs periodically
387  */
388 static struct GNUNET_SCHEDULER_Task *shutdown_task;
389
390 /**
391  * Identifier for the churn task that runs periodically
392  */
393 static struct GNUNET_SCHEDULER_Task *churn_task;
394
395 /**
396  * Called to initialise the given RPSPeer
397  */
398 typedef void (*InitPeer) (struct RPSPeer *rps_peer);
399
400 /**
401  * @brief Called directly after connecting to the service
402  *
403  * @param rps_peer Specific peer the function is called on
404  * @param h the handle to the rps service
405  */
406 typedef void (*PreTest) (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h);
407
408 /**
409  * @brief Executes functions to test the api/service for a given peer
410  *
411  * Called from within #rps_connect_complete_cb ()
412  * Implemented by #churn_test_cb, #profiler_cb, #mal_cb, #single_req_cb,
413  * #delay_req_cb, #seed_big_cb, #single_peer_seed_cb, #seed_cb, #req_cancel_cb
414  *
415  * @param rps_peer the peer the task runs on
416  */
417 typedef void (*MainTest) (struct RPSPeer *rps_peer);
418
419 /**
420  * Callback called once the requested random peers are available
421  */
422 typedef void (*ReplyHandle) (void *cls,
423                              uint64_t n,
424                              const struct GNUNET_PeerIdentity *recv_peers);
425
426 /**
427  * Called directly before disconnecting from the service
428  */
429 typedef void (*PostTest) (struct RPSPeer *peer);
430
431 /**
432  * Function called after disconnect to evaluate test success
433  */
434 typedef int (*EvaluationCallback) (void);
435
436 /**
437  * @brief Do we have Churn?
438  */
439 enum OPTION_CHURN {
440   /**
441    * @brief If we have churn this is set
442    */
443   HAVE_CHURN,
444   /**
445    * @brief If we have no churn this is set
446    */
447   HAVE_NO_CHURN,
448 };
449
450 /**
451  * @brief Is it ok to quit the test before the timeout?
452  */
453 enum OPTION_QUICK_QUIT {
454   /**
455    * @brief It is ok for the test to quit before the timeout triggers
456    */
457   HAVE_QUICK_QUIT,
458
459   /**
460    * @brief It is NOT ok for the test to quit before the timeout triggers
461    */
462   HAVE_NO_QUICK_QUIT,
463 };
464
465 /**
466  * @brief Do we collect statistics at the end?
467  */
468 enum OPTION_COLLECT_STATISTICS {
469   /**
470    * @brief We collect statistics at the end
471    */
472   COLLECT_STATISTICS,
473
474   /**
475    * @brief We do not collect statistics at the end
476    */
477   NO_COLLECT_STATISTICS,
478 };
479
480 /**
481  * @brief Do we collect views during run?
482  */
483 enum OPTION_COLLECT_VIEW {
484   /**
485    * @brief We collect view during run
486    */
487   COLLECT_VIEW,
488
489   /**
490    * @brief We do not collect the view during run
491    */
492   NO_COLLECT_VIEW,
493 };
494
495 /**
496  * Structure to define a single test
497  */
498 struct SingleTestRun
499 {
500   /**
501    * Name of the test
502    */
503   char *name;
504
505   /**
506    * Called with a single peer in order to initialise that peer
507    */
508   InitPeer init_peer;
509
510   /**
511    * Called directly after connecting to the service
512    */
513   PreTest pre_test;
514
515   /**
516    * Main function for each peer
517    */
518   MainTest main_test;
519
520   /**
521    * Callback called once the requested peers are available
522    */
523   ReplyHandle reply_handle;
524
525   /**
526    * Called directly before disconnecting from the service
527    */
528   PostTest post_test;
529
530   /**
531    * Function to evaluate the test results
532    */
533   EvaluationCallback eval_cb;
534
535   /**
536    * Request interval
537    */
538   uint32_t request_interval;
539
540   /**
541    * Number of Requests to make.
542    */
543   uint32_t num_requests;
544
545   /**
546    * Run with (-out) churn
547    */
548   enum OPTION_CHURN have_churn;
549
550   /**
551    * Quit test before timeout?
552    */
553   enum OPTION_QUICK_QUIT have_quick_quit;
554
555   /**
556    * Collect statistics at the end?
557    */
558   enum OPTION_COLLECT_STATISTICS have_collect_statistics;
559
560   /**
561    * Collect view during run?
562    */
563   enum OPTION_COLLECT_VIEW have_collect_view;
564
565   /**
566    * @brief Mark which values from the statistics service to collect at the end
567    * of the run
568    */
569   uint32_t stat_collect_flags;
570 } cur_test_run;
571
572 /**
573  * Did we finish the test?
574  */
575 static int post_test;
576
577 /**
578  * Are we shutting down?
579  */
580 static int in_shutdown;
581
582 /**
583  * Append arguments to file
584  */
585 static void
586 tofile_ (const char *file_name, const char *line)
587 {
588   struct GNUNET_DISK_FileHandle *f;
589   /* char output_buffer[512]; */
590   size_t size;
591   /* int size; */
592   size_t size2;
593
594   if (NULL == (f = GNUNET_DISK_file_open (file_name,
595                                           GNUNET_DISK_OPEN_APPEND |
596                                           GNUNET_DISK_OPEN_WRITE |
597                                           GNUNET_DISK_OPEN_CREATE,
598                                           GNUNET_DISK_PERM_USER_READ |
599                                           GNUNET_DISK_PERM_USER_WRITE |
600                                           GNUNET_DISK_PERM_GROUP_READ |
601                                           GNUNET_DISK_PERM_OTHER_READ)))
602   {
603     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
604                 "Not able to open file %s\n",
605                 file_name);
606     return;
607   }
608   /* size = GNUNET_snprintf (output_buffer,
609                           sizeof (output_buffer),
610                           "%llu %s\n",
611                           GNUNET_TIME_absolute_get ().abs_value_us,
612                           line);
613   if (0 > size)
614   {
615     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
616                 "Failed to write string to buffer (size: %i)\n",
617                 size);
618     return;
619   } */
620
621   size = strlen (line) * sizeof (char);
622
623   size2 = GNUNET_DISK_file_write (f, line, size);
624   if (size != size2)
625   {
626     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
627                 "Unable to write to file! (Size: %lu, size2: %lu)\n",
628                 size,
629                 size2);
630     if (GNUNET_YES != GNUNET_DISK_file_close (f))
631     {
632       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
633                   "Unable to close file\n");
634     }
635     return;
636   }
637
638   if (GNUNET_YES != GNUNET_DISK_file_close (f))
639   {
640     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
641                 "Unable to close file\n");
642   }
643 }
644
645 /**
646  * This function is used to facilitate writing important information to disk
647  */
648 #define tofile(file_name, ...) do {\
649   char tmp_buf[512];\
650     int size;\
651     size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
652     if (0 > size)\
653       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
654                      "Failed to create tmp_buf\n");\
655     else\
656       tofile_(file_name,tmp_buf);\
657   } while (0);
658
659
660 /**
661  * Write the ids and their according index in the given array to a file
662  * Unused
663  */
664 /* static void
665 ids_to_file (char *file_name,
666              struct GNUNET_PeerIdentity *peer_ids,
667              unsigned int num_peer_ids)
668 {
669   unsigned int i;
670
671   for (i=0 ; i < num_peer_ids ; i++)
672   {
673     to_file (file_name,
674              "%u\t%s",
675              i,
676              GNUNET_i2s_full (&peer_ids[i]));
677   }
678 } */
679
680 /**
681  * Task run on timeout to collect statistics and potentially shut down.
682  */
683 static void
684 post_test_op (void *cls);
685
686
687 /**
688  * Test the success of a single test
689  */
690 static int
691 evaluate (void)
692 {
693   unsigned int i;
694   int tmp_ok;
695
696   tmp_ok = 1;
697
698   for (i = 0; i < num_peers; i++)
699   {
700     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
701         "%u. peer [%s] received %u of %u expected peer_ids: %i\n",
702         i,
703         GNUNET_i2s (rps_peers[i].peer_id),
704         rps_peers[i].num_recv_ids,
705         rps_peers[i].num_ids_to_request,
706         (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids));
707     tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids);
708   }
709   return tmp_ok? 0 : 1;
710 }
711
712
713 /**
714  * Creates an oplist entry and adds it to the oplist DLL
715  */
716 static struct OpListEntry *
717 make_oplist_entry ()
718 {
719   struct OpListEntry *entry;
720
721   entry = GNUNET_new (struct OpListEntry);
722   GNUNET_CONTAINER_DLL_insert_tail (oplist_head, oplist_tail, entry);
723   return entry;
724 }
725
726
727 /**
728  * @brief Checks if given peer already received its statistics value from the
729  * statistics service.
730  *
731  * @param rps_peer the peer to check for
732  *
733  * @return #GNUNET_YES if so
734  *         #GNUNET_NO otherwise
735  */
736 static int check_statistics_collect_completed_single_peer (
737     const struct RPSPeer *rps_peer)
738 {
739   if (cur_test_run.stat_collect_flags !=
740         (cur_test_run.stat_collect_flags &
741           rps_peer->stat_collected_flags))
742   {
743     return GNUNET_NO;
744   }
745   return GNUNET_YES;
746 }
747
748
749 /**
750  * @brief Checks if all peers already received their statistics value from the
751  * statistics service.
752  *
753  * @return #GNUNET_YES if so
754  *         #GNUNET_NO otherwise
755  */
756 static int check_statistics_collect_completed ()
757 {
758   uint32_t i;
759
760   for (i = 0; i < num_peers; i++)
761   {
762     if (GNUNET_NO == check_statistics_collect_completed_single_peer (&rps_peers[i]))
763     {
764       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765           "At least Peer %" PRIu32 " did not yet receive all statistics values\n",
766           i);
767       return GNUNET_NO;
768     }
769   }
770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771       "All peers received their statistics values\n");
772   return GNUNET_YES;
773 }
774
775
776 /**
777  * Task run on timeout to shut everything down.
778  */
779 static void
780 shutdown_op (void *cls)
781 {
782   unsigned int i;
783   (void) cls;
784
785   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
786               "Shutdown task scheduled, going down.\n");
787   in_shutdown = GNUNET_YES;
788   if (NULL != post_test_task)
789   {
790     GNUNET_SCHEDULER_cancel (post_test_task);
791     post_test_op (NULL);
792   }
793   if (NULL != churn_task)
794   {
795     GNUNET_SCHEDULER_cancel (churn_task);
796     churn_task = NULL;
797   }
798   for (i = 0; i < num_peers; i++)
799   {
800     if (NULL != rps_peers[i].rps_handle)
801     {
802       GNUNET_RPS_disconnect (rps_peers[i].rps_handle);
803     }
804     if (NULL != rps_peers[i].op)
805     {
806       GNUNET_TESTBED_operation_done (rps_peers[i].op);
807     }
808   }
809 }
810
811
812 /**
813  * Task run on timeout to collect statistics and potentially shut down.
814  */
815 static void
816 post_test_op (void *cls)
817 {
818   unsigned int i;
819   (void) cls;
820
821   post_test_task = NULL;
822   post_test = GNUNET_YES;
823   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
824               "Post test task scheduled, going down.\n");
825   if (NULL != churn_task)
826   {
827     GNUNET_SCHEDULER_cancel (churn_task);
828     churn_task = NULL;
829   }
830   for (i = 0; i < num_peers; i++)
831   {
832     if (NULL != cur_test_run.post_test)
833     {
834       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i);
835       cur_test_run.post_test (&rps_peers[i]);
836     }
837     if (NULL != rps_peers[i].op)
838     {
839       GNUNET_TESTBED_operation_done (rps_peers[i].op);
840       rps_peers[i].op = NULL;
841     }
842   }
843   /* If we do not collect statistics, shut down directly */
844   if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics ||
845       GNUNET_YES == check_statistics_collect_completed())
846   {
847     GNUNET_SCHEDULER_shutdown ();
848   }
849 }
850
851
852 /**
853  * Seed peers.
854  */
855 static void
856 seed_peers (void *cls)
857 {
858   struct RPSPeer *peer = cls;
859   unsigned int amount;
860   unsigned int i;
861
862   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
863   {
864     return;
865   }
866
867   GNUNET_assert (NULL != peer->rps_handle);
868
869   // TODO if malicious don't seed mal peers
870   amount = round (.5 * num_peers);
871
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding peers:\n");
873   for (i = 0 ; i < amount ; i++)
874     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
875                 i,
876                 GNUNET_i2s (&rps_peer_ids[i]));
877
878   GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids);
879 }
880
881
882 /**
883  * Seed peers.
884  */
885 static void
886 seed_peers_big (void *cls)
887 {
888   struct RPSPeer *peer = cls;
889   unsigned int seed_msg_size;
890   uint32_t num_peers_max;
891   unsigned int amount;
892   unsigned int i;
893
894   seed_msg_size = 8; /* sizeof (struct GNUNET_RPS_CS_SeedMessage) */
895   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - seed_msg_size) /
896     sizeof (struct GNUNET_PeerIdentity);
897   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898       "Peers that fit in one seed msg; %u\n",
899       num_peers_max);
900   amount = num_peers_max + (0.5 * num_peers_max);
901   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
902       "Seeding many (%u) peers:\n",
903       amount);
904   struct GNUNET_PeerIdentity ids_to_seed[amount];
905   for (i = 0; i < amount; i++)
906   {
907     ids_to_seed[i] = *peer->peer_id;
908     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
909                 i,
910                 GNUNET_i2s (&ids_to_seed[i]));
911   }
912
913   GNUNET_RPS_seed_ids (peer->rps_handle, amount, ids_to_seed);
914 }
915
916 /**
917  * Get the id of peer i.
918  */
919   void
920 info_cb (void *cb_cls,
921          struct GNUNET_TESTBED_Operation *op,
922          const struct GNUNET_TESTBED_PeerInformation *pinfo,
923          const char *emsg)
924 {
925   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
926   (void) op;
927
928   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
929   {
930     return;
931   }
932
933   if (NULL == pinfo || NULL != emsg)
934   {
935     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
936     GNUNET_TESTBED_operation_done (entry->op);
937     return;
938   }
939
940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941               "Peer %u is %s\n",
942               entry->index,
943               GNUNET_i2s (pinfo->result.id));
944
945   rps_peer_ids[entry->index] = *(pinfo->result.id);
946   rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
947
948   GNUNET_assert (GNUNET_OK ==
949       GNUNET_CONTAINER_multipeermap_put (peer_map,
950         &rps_peer_ids[entry->index],
951         &rps_peers[entry->index],
952         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
953   tofile ("/tmp/rps/peer_ids",
954            "%u\t%s\n",
955            entry->index,
956            GNUNET_i2s_full (&rps_peer_ids[entry->index]));
957
958   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
959   GNUNET_TESTBED_operation_done (entry->op);
960   GNUNET_free (entry);
961 }
962
963
964 /**
965  * Callback to be called when RPS service connect operation is completed
966  *
967  * @param cls the callback closure from functions generating an operation
968  * @param op the operation that has been finished
969  * @param ca_result the RPS service handle returned from rps_connect_adapter
970  * @param emsg error message in case the operation has failed; will be NULL if
971  *          operation has executed successfully.
972  */
973 static void
974 rps_connect_complete_cb (void *cls,
975                          struct GNUNET_TESTBED_Operation *op,
976                          void *ca_result,
977                          const char *emsg)
978 {
979   struct RPSPeer *rps_peer = cls;
980   struct GNUNET_RPS_Handle *rps = ca_result;
981
982   GNUNET_assert (NULL != ca_result);
983
984   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
985   {
986     return;
987   }
988
989   rps_peer->rps_handle = rps;
990   rps_peer->online = GNUNET_YES;
991   num_peers_online++;
992
993   GNUNET_assert (op == rps_peer->op);
994   if (NULL != emsg)
995   {
996     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
997                 "Failed to connect to RPS service: %s\n",
998                 emsg);
999     ok = 1;
1000     GNUNET_SCHEDULER_shutdown ();
1001     return;
1002   }
1003
1004   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Started client successfully\n");
1005
1006   cur_test_run.main_test (rps_peer);
1007 }
1008
1009
1010 /**
1011  * Adapter function called to establish a connection to
1012  * the RPS service.
1013  *
1014  * @param cls closure
1015  * @param cfg configuration of the peer to connect to; will be available until
1016  *          GNUNET_TESTBED_operation_done() is called on the operation returned
1017  *          from GNUNET_TESTBED_service_connect()
1018  * @return service handle to return in 'op_result', NULL on error
1019  */
1020 static void *
1021 rps_connect_adapter (void *cls,
1022                                  const struct GNUNET_CONFIGURATION_Handle *cfg)
1023 {
1024   struct GNUNET_RPS_Handle *h;
1025
1026   h = GNUNET_RPS_connect (cfg);
1027   GNUNET_assert (NULL != h);
1028
1029   if (NULL != cur_test_run.pre_test)
1030     cur_test_run.pre_test (cls, h);
1031   GNUNET_assert (NULL != h);
1032
1033   return h;
1034 }
1035
1036 /**
1037  * Called to open a connection to the peer's statistics
1038  *
1039  * @param cls peer context
1040  * @param cfg configuration of the peer to connect to; will be available until
1041  *          GNUNET_TESTBED_operation_done() is called on the operation returned
1042  *          from GNUNET_TESTBED_service_connect()
1043  * @return service handle to return in 'op_result', NULL on error
1044  */
1045 static void *
1046 stat_connect_adapter (void *cls,
1047                       const struct GNUNET_CONFIGURATION_Handle *cfg)
1048 {
1049   struct RPSPeer *peer = cls;
1050
1051   peer->stats_h = GNUNET_STATISTICS_create ("rps-profiler", cfg);
1052   return peer->stats_h;
1053 }
1054
1055 /**
1056  * Called to disconnect from peer's statistics service
1057  *
1058  * @param cls peer context
1059  * @param op_result service handle returned from the connect adapter
1060  */
1061 static void
1062 stat_disconnect_adapter (void *cls, void *op_result)
1063 {
1064   struct RPSPeer *peer = cls;
1065
1066   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
1067   //              (peer->stats_h, "core", "# peers connected",
1068   //               stat_iterator, peer));
1069   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
1070   //              (peer->stats_h, "nse", "# peers connected",
1071   //               stat_iterator, peer));
1072   GNUNET_STATISTICS_destroy (op_result, GNUNET_NO);
1073   peer->stats_h = NULL;
1074 }
1075
1076 /**
1077  * Called after successfully opening a connection to a peer's statistics
1078  * service; we register statistics monitoring for CORE and NSE here.
1079  *
1080  * @param cls the callback closure from functions generating an operation
1081  * @param op the operation that has been finished
1082  * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
1083  * @param emsg error message in case the operation has failed; will be NULL if
1084  *          operation has executed successfully.
1085  */
1086 static void
1087 stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
1088                   void *ca_result, const char *emsg )
1089 {
1090   //struct GNUNET_STATISTICS_Handle *sh = ca_result;
1091   //struct RPSPeer *peer = (struct RPSPeer *) cls;
1092   (void) cls;
1093   (void) op;
1094   (void) ca_result;
1095
1096   if (NULL != emsg)
1097   {
1098     GNUNET_break (0);
1099     return;
1100   }
1101   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
1102   //              (sh, "core", "# peers connected",
1103   //               stat_iterator, peer));
1104   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
1105   //              (sh, "nse", "# peers connected",
1106   //               stat_iterator, peer));
1107 }
1108
1109
1110 /**
1111  * Adapter function called to destroy connection to
1112  * RPS service.
1113  *
1114  * @param cls closure
1115  * @param op_result service handle returned from the connect adapter
1116  */
1117 static void
1118 rps_disconnect_adapter (void *cls,
1119                                           void *op_result)
1120 {
1121   struct RPSPeer *peer = cls;
1122   struct GNUNET_RPS_Handle *h = op_result;
1123
1124   if (NULL != peer->rps_srh)
1125   {
1126     GNUNET_RPS_stream_cancel (peer->rps_srh);
1127     peer->rps_srh = NULL;
1128   }
1129   GNUNET_assert (NULL != peer);
1130   GNUNET_RPS_disconnect (h);
1131   peer->rps_handle = NULL;
1132 }
1133
1134
1135 /***********************************************************************
1136  * Definition of tests
1137 ***********************************************************************/
1138
1139 // TODO check whether tests can be stopped earlier
1140 static int
1141 default_eval_cb (void)
1142 {
1143   return evaluate ();
1144 }
1145
1146 static int
1147 no_eval (void)
1148 {
1149   return 0;
1150 }
1151
1152 /**
1153  * Initialise given RPSPeer
1154  */
1155 static void default_init_peer (struct RPSPeer *rps_peer)
1156 {
1157   rps_peer->num_ids_to_request = 1;
1158 }
1159
1160 /**
1161  * Callback to call on receipt of a reply
1162  *
1163  * @param cls closure
1164  * @param n number of peers
1165  * @param recv_peers the received peers
1166  */
1167 static void
1168 default_reply_handle (void *cls,
1169                       uint64_t n,
1170                       const struct GNUNET_PeerIdentity *recv_peers)
1171 {
1172   struct RPSPeer *rps_peer;
1173   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1174   unsigned int i;
1175
1176   rps_peer = pending_rep->rps_peer;
1177   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1178                                rps_peer->pending_rep_tail,
1179                                pending_rep);
1180   rps_peer->num_pending_reps--;
1181   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182               "[%s] got %" PRIu64 " peers:\n",
1183               GNUNET_i2s (rps_peer->peer_id),
1184               n);
1185
1186   for (i = 0; i < n; i++)
1187   {
1188     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189                 "%u: %s\n",
1190                 i,
1191                 GNUNET_i2s (&recv_peers[i]));
1192
1193     rps_peer->num_recv_ids++;
1194   }
1195
1196   if (0 == evaluate () && HAVE_QUICK_QUIT == cur_test_run.have_quick_quit)
1197   {
1198     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test succeeded before timeout\n");
1199     GNUNET_assert (NULL != post_test_task);
1200     GNUNET_SCHEDULER_cancel (post_test_task);
1201     post_test_task = GNUNET_SCHEDULER_add_now (&post_test_op, NULL);
1202     GNUNET_assert (NULL!= post_test_task);
1203   }
1204 }
1205
1206 /**
1207  * Request random peers.
1208  */
1209 static void
1210 request_peers (void *cls)
1211 {
1212   struct PendingRequest *pending_req = cls;
1213   struct RPSPeer *rps_peer;
1214   struct PendingReply *pending_rep;
1215
1216   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1217     return;
1218   rps_peer = pending_req->rps_peer;
1219   GNUNET_assert (1 <= rps_peer->num_pending_reqs);
1220   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
1221                                rps_peer->pending_req_tail,
1222                                pending_req);
1223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224               "Requesting one peer\n");
1225   pending_rep = GNUNET_new (struct PendingReply);
1226   pending_rep->rps_peer = rps_peer;
1227   pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
1228       1,
1229       cur_test_run.reply_handle,
1230       pending_rep);
1231   GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
1232                                     rps_peer->pending_rep_tail,
1233                                     pending_rep);
1234   rps_peer->num_pending_reps++;
1235   rps_peer->num_pending_reqs--;
1236 }
1237
1238 static void
1239 cancel_pending_req (struct PendingRequest *pending_req)
1240 {
1241   struct RPSPeer *rps_peer;
1242
1243   rps_peer = pending_req->rps_peer;
1244   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
1245                                rps_peer->pending_req_tail,
1246                                pending_req);
1247   rps_peer->num_pending_reqs--;
1248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1249               "Cancelling pending request\n");
1250   GNUNET_SCHEDULER_cancel (pending_req->request_task);
1251   GNUNET_free (pending_req);
1252 }
1253
1254 static void
1255 cancel_request (struct PendingReply *pending_rep)
1256 {
1257   struct RPSPeer *rps_peer;
1258
1259   rps_peer = pending_rep->rps_peer;
1260   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1261                                rps_peer->pending_rep_tail,
1262                                pending_rep);
1263   rps_peer->num_pending_reps--;
1264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265               "Cancelling request\n");
1266   GNUNET_RPS_request_cancel (pending_rep->req_handle);
1267   GNUNET_free (pending_rep);
1268 }
1269
1270 /**
1271  * Cancel a request.
1272  */
1273 static void
1274 cancel_request_cb (void *cls)
1275 {
1276   struct RPSPeer *rps_peer = cls;
1277   struct PendingReply *pending_rep;
1278
1279   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1280     return;
1281   pending_rep = rps_peer->pending_rep_head;
1282   GNUNET_assert (1 <= rps_peer->num_pending_reps);
1283   cancel_request (pending_rep);
1284 }
1285
1286
1287 /**
1288  * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
1289  * issued, nor replied
1290  */
1291 void
1292 schedule_missing_requests (struct RPSPeer *rps_peer)
1293 {
1294   unsigned int i;
1295   struct PendingRequest *pending_req;
1296
1297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298       "Scheduling %u - %u missing requests\n",
1299       rps_peer->num_ids_to_request,
1300       rps_peer->num_pending_reqs + rps_peer->num_pending_reps);
1301   GNUNET_assert (rps_peer->num_pending_reqs + rps_peer->num_pending_reps <=
1302       rps_peer->num_ids_to_request);
1303   for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
1304        i < rps_peer->num_ids_to_request; i++)
1305   {
1306     pending_req = GNUNET_new (struct PendingRequest);
1307     pending_req->rps_peer = rps_peer;
1308     pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
1309         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1310           cur_test_run.request_interval * i),
1311         request_peers,
1312         pending_req);
1313     GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
1314                                       rps_peer->pending_req_tail,
1315                                       pending_req);
1316     rps_peer->num_pending_reqs++;
1317   }
1318 }
1319
1320 void
1321 cancel_pending_req_rep (struct RPSPeer *rps_peer)
1322 {
1323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324       "Cancelling all (pending) requests.\n");
1325   while (NULL != rps_peer->pending_req_head)
1326     cancel_pending_req (rps_peer->pending_req_head);
1327   GNUNET_assert (0 == rps_peer->num_pending_reqs);
1328   while (NULL != rps_peer->pending_rep_head)
1329     cancel_request (rps_peer->pending_rep_head);
1330   GNUNET_assert (0 == rps_peer->num_pending_reps);
1331 }
1332
1333 /***********************************
1334  * MALICIOUS
1335 ***********************************/
1336
1337 /**
1338  * Initialise only non-mal RPSPeers
1339  */
1340 static void mal_init_peer (struct RPSPeer *rps_peer)
1341 {
1342   if (rps_peer->index >= round (portion * num_peers))
1343     rps_peer->num_ids_to_request = 1;
1344 }
1345
1346
1347 /**
1348  * @brief Set peers to (non-)malicious before execution
1349  *
1350  * Of signature #PreTest
1351  *
1352  * @param rps_peer the peer to set (non-) malicious
1353  * @param h the handle to the service
1354  */
1355 static void
1356 mal_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1357 {
1358   #ifdef ENABLE_MALICIOUS
1359   uint32_t num_mal_peers;
1360
1361   GNUNET_assert ( (1 >= portion) &&
1362                   (0 <  portion) );
1363   num_mal_peers = round (portion * num_peers);
1364
1365   if (rps_peer->index < num_mal_peers)
1366   {
1367     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1368                 "%u. peer [%s] of %" PRIu32 " malicious peers turning malicious\n",
1369                 rps_peer->index,
1370                 GNUNET_i2s (rps_peer->peer_id),
1371                 num_mal_peers);
1372
1373     GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
1374                               rps_peer_ids, target_peer);
1375   }
1376   #endif /* ENABLE_MALICIOUS */
1377 }
1378
1379 static void
1380 mal_cb (struct RPSPeer *rps_peer)
1381 {
1382   uint32_t num_mal_peers;
1383
1384   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1385   {
1386     return;
1387   }
1388
1389   #ifdef ENABLE_MALICIOUS
1390   GNUNET_assert ( (1 >= portion) &&
1391                   (0 <  portion) );
1392   num_mal_peers = round (portion * num_peers);
1393
1394   if (rps_peer->index >= num_mal_peers)
1395   { /* It's useless to ask a malicious peer about a random sample -
1396        it's not sampling */
1397     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1398                                   seed_peers, rps_peer);
1399     schedule_missing_requests (rps_peer);
1400   }
1401   #endif /* ENABLE_MALICIOUS */
1402 }
1403
1404
1405 /***********************************
1406  * SINGLE_REQUEST
1407 ***********************************/
1408 static void
1409 single_req_cb (struct RPSPeer *rps_peer)
1410 {
1411   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1412   {
1413     return;
1414   }
1415
1416   schedule_missing_requests (rps_peer);
1417 }
1418
1419 /***********************************
1420  * DELAYED_REQUESTS
1421 ***********************************/
1422 static void
1423 delay_req_cb (struct RPSPeer *rps_peer)
1424 {
1425   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1426   {
1427     return;
1428   }
1429
1430   schedule_missing_requests (rps_peer);
1431 }
1432
1433 /***********************************
1434  * SEED
1435 ***********************************/
1436 static void
1437 seed_cb (struct RPSPeer *rps_peer)
1438 {
1439   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1440   {
1441     return;
1442   }
1443
1444   GNUNET_SCHEDULER_add_delayed (
1445       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
1446       seed_peers, rps_peer);
1447 }
1448
1449 /***********************************
1450  * SEED_BIG
1451 ***********************************/
1452 static void
1453 seed_big_cb (struct RPSPeer *rps_peer)
1454 {
1455   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1456   {
1457     return;
1458   }
1459
1460   // TODO test seeding > GNUNET_MAX_MESSAGE_SIZE peers
1461   GNUNET_SCHEDULER_add_delayed (
1462       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1463       seed_peers_big, rps_peer);
1464 }
1465
1466 /***********************************
1467  * SINGLE_PEER_SEED
1468 ***********************************/
1469 static void
1470 single_peer_seed_cb (struct RPSPeer *rps_peer)
1471 {
1472   (void) rps_peer;
1473   // TODO
1474 }
1475
1476 /***********************************
1477  * SEED_REQUEST
1478 ***********************************/
1479 static void
1480 seed_req_cb (struct RPSPeer *rps_peer)
1481 {
1482   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1483   {
1484     return;
1485   }
1486
1487   GNUNET_SCHEDULER_add_delayed (
1488       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1489       seed_peers, rps_peer);
1490   schedule_missing_requests (rps_peer);
1491 }
1492
1493 //TODO start big mal
1494
1495 /***********************************
1496  * REQUEST_CANCEL
1497 ***********************************/
1498 static void
1499 req_cancel_cb (struct RPSPeer *rps_peer)
1500 {
1501   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1502   {
1503     return;
1504   }
1505
1506   schedule_missing_requests (rps_peer);
1507   GNUNET_SCHEDULER_add_delayed (
1508       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1509                                      (cur_test_run.request_interval + 1)),
1510       cancel_request_cb, rps_peer);
1511 }
1512
1513 /***********************************
1514  * CHURN
1515 ***********************************/
1516
1517 static void
1518 churn (void *cls);
1519
1520 /**
1521  * @brief Starts churn
1522  *
1523  * Has signature of #MainTest
1524  *
1525  * This is not implemented too nicely as this is called for each peer, but we
1526  * only need to call it once. (Yes we check that we only schedule the task
1527  * once.)
1528  *
1529  * @param rps_peer The peer it's called for
1530  */
1531 static void
1532 churn_test_cb (struct RPSPeer *rps_peer)
1533 {
1534   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1535   {
1536     return;
1537   }
1538
1539   /* Start churn */
1540   if (HAVE_CHURN == cur_test_run.have_churn && NULL == churn_task)
1541   {
1542     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1543                 "Starting churn task\n");
1544     churn_task = GNUNET_SCHEDULER_add_delayed (
1545           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1546           churn,
1547           NULL);
1548   } else {
1549     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550                 "Not starting churn task\n");
1551   }
1552
1553   schedule_missing_requests (rps_peer);
1554 }
1555
1556 /***********************************
1557  * SUB
1558 ***********************************/
1559
1560 static void
1561 got_stream_peer_cb (void *cls,
1562                     uint64_t num_peers,
1563                     const struct GNUNET_PeerIdentity *peers)
1564 {
1565   const struct RPSPeer *rps_peer = cls;
1566
1567   for (uint64_t i = 0; i < num_peers; i++)
1568   {
1569     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570                 "Peer %" PRIu32 " received [%s] from stream.\n",
1571                 rps_peer->index,
1572                 GNUNET_i2s (&peers[i]));
1573     if (0 != rps_peer->index &&
1574         0 == memcmp (&peers[i],
1575                      &rps_peers[0].peer_id,
1576                      sizeof (struct GNUNET_PeerIdentity)))
1577     {
1578       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received a peer id outside sub\n");
1579       ok = 1;
1580     }
1581     else if (0 == rps_peer->index &&
1582              0 != memcmp (&peers[i],
1583                           &rps_peers[0].peer_id,
1584                           sizeof (struct GNUNET_PeerIdentity)))
1585     {
1586       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received a peer id outside sub (lonely)\n");
1587       ok = 1;
1588     }
1589   }
1590 }
1591
1592
1593 static void
1594 sub_post (struct RPSPeer *rps_peer)
1595 {
1596   if (0 != rps_peer->index) GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
1597   else GNUNET_RPS_sub_stop (rps_peer->rps_handle, "lonely");
1598 }
1599
1600
1601 static void
1602 sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1603 {
1604   (void) rps_peer;
1605
1606   if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test");
1607   else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */
1608   rps_peer->rps_srh = GNUNET_RPS_stream_request (h,
1609                                                  &got_stream_peer_cb,
1610                                                  rps_peer);
1611 }
1612
1613 /***********************************
1614  * PROFILER
1615 ***********************************/
1616
1617 /**
1618  * Callback to be called when RPS service is started or stopped at peers
1619  *
1620  * @param cls NULL
1621  * @param op the operation handle
1622  * @param emsg NULL on success; otherwise an error description
1623  */
1624 static void
1625 churn_cb (void *cls,
1626           struct GNUNET_TESTBED_Operation *op,
1627           const char *emsg)
1628 {
1629   (void) op;
1630   // FIXME
1631   struct OpListEntry *entry = cls;
1632
1633   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1634   {
1635     return;
1636   }
1637
1638   GNUNET_TESTBED_operation_done (entry->op);
1639   if (NULL != emsg)
1640   {
1641     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
1642     GNUNET_SCHEDULER_shutdown ();
1643     return;
1644   }
1645   GNUNET_assert (0 != entry->delta);
1646
1647   num_peers_online += entry->delta;
1648
1649   if (PEER_GO_OFFLINE == entry->delta)
1650   { /* Peer hopefully just went offline */
1651     if (GNUNET_YES != rps_peers[entry->index].online)
1652     {
1653       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1654                   "peer %s was expected to go offline but is still marked as online\n",
1655                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1656       GNUNET_break (0);
1657     }
1658     else
1659     {
1660       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661                   "peer %s probably went offline as expected\n",
1662                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1663     }
1664     rps_peers[entry->index].online = GNUNET_NO;
1665   }
1666
1667   else if (PEER_GO_ONLINE < entry->delta)
1668   { /* Peer hopefully just went online */
1669     if (GNUNET_NO != rps_peers[entry->index].online)
1670     {
1671       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1672                   "peer %s was expected to go online but is still marked as offline\n",
1673                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1674       GNUNET_break (0);
1675     }
1676     else
1677     {
1678       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1679                   "peer %s probably went online as expected\n",
1680                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1681       if (NULL != cur_test_run.pre_test)
1682       {
1683         cur_test_run.pre_test (&rps_peers[entry->index],
1684             rps_peers[entry->index].rps_handle);
1685         schedule_missing_requests (&rps_peers[entry->index]);
1686       }
1687     }
1688     rps_peers[entry->index].online = GNUNET_YES;
1689   }
1690   else
1691   {
1692     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1693         "Invalid value for delta: %i\n", entry->delta);
1694     GNUNET_break (0);
1695   }
1696
1697   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
1698   rps_peers[entry->index].entry_op_manage = NULL;
1699   GNUNET_free (entry);
1700   //if (num_peers_in_round[current_round] == peers_running)
1701   //  run_round ();
1702 }
1703
1704 /**
1705  * @brief Set the rps-service up or down for a specific peer
1706  *
1707  * @param i index of action
1708  * @param j index of peer
1709  * @param delta (#PEER_ONLINE_DELTA) down (-1) or up (1)
1710  * @param prob_go_on_off the probability of the action
1711  */
1712 static void
1713 manage_service_wrapper (unsigned int i, unsigned int j,
1714                         enum PEER_ONLINE_DELTA delta,
1715                         double prob_go_on_off)
1716 {
1717   struct OpListEntry *entry = NULL;
1718   uint32_t prob;
1719
1720   /* make sure that management operation is not already scheduled */
1721   if (NULL != rps_peers[j].entry_op_manage)
1722   {
1723     return;
1724   }
1725
1726   prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1727                                    UINT32_MAX);
1728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1729               "%u. selected peer (%u: %s) is %s.\n",
1730               i,
1731               j,
1732               GNUNET_i2s (rps_peers[j].peer_id),
1733               (PEER_GO_ONLINE == delta) ? "online" : "offline");
1734   if (prob < prob_go_on_off * UINT32_MAX)
1735   {
1736     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1737                 "%s goes %s\n",
1738                 GNUNET_i2s (rps_peers[j].peer_id),
1739                 (PEER_GO_OFFLINE == delta) ? "offline" : "online");
1740
1741     if (PEER_GO_OFFLINE == delta)
1742       cancel_pending_req_rep (&rps_peers[j]);
1743     entry = make_oplist_entry ();
1744     entry->delta = delta;
1745     entry->index = j;
1746     entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
1747                                                     testbed_peers[j],
1748                                                     "rps",
1749                                                     &churn_cb,
1750                                                     entry,
1751                                                     (PEER_GO_OFFLINE == delta) ? 0 : 1);
1752     rps_peers[j].entry_op_manage = entry;
1753   }
1754 }
1755
1756
1757 static void
1758 churn (void *cls)
1759 {
1760   (void) cls;
1761   unsigned int i;
1762   unsigned int j;
1763   double portion_online;
1764   unsigned int *permut;
1765   double prob_go_offline;
1766   double portion_go_online;
1767   double portion_go_offline;
1768
1769   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1770   {
1771     return;
1772   }
1773   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774               "Churn function executing\n");
1775
1776   churn_task = NULL; /* Should be invalid by now */
1777
1778   /* Compute the probability for an online peer to go offline
1779    * this round */
1780   portion_online = num_peers_online * 1.0 / num_peers;
1781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782               "Portion online: %f\n",
1783               portion_online);
1784   portion_go_online = ((1 - portion_online) * .5 * .66);
1785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1786               "Portion that should go online: %f\n",
1787               portion_go_online);
1788   portion_go_offline = (portion_online + portion_go_online) - .75;
1789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790               "Portion that probably goes offline: %f\n",
1791               portion_go_offline);
1792   prob_go_offline = portion_go_offline / (portion_online * .5);
1793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794               "Probability of a selected online peer to go offline: %f\n",
1795               prob_go_offline);
1796
1797   permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK,
1798                                          (unsigned int) num_peers);
1799
1800   /* Go over 50% randomly chosen peers */
1801   for (i = 0; i < .5 * num_peers; i++)
1802   {
1803     j = permut[i];
1804
1805     /* If online, shut down with certain probability */
1806     if (GNUNET_YES == rps_peers[j].online)
1807     {
1808       manage_service_wrapper (i, j, -1, prob_go_offline);
1809     }
1810
1811     /* If offline, restart with certain probability */
1812     else if (GNUNET_NO == rps_peers[j].online)
1813     {
1814       manage_service_wrapper (i, j, 1, 0.66);
1815     }
1816   }
1817
1818   GNUNET_free (permut);
1819
1820   churn_task = GNUNET_SCHEDULER_add_delayed (
1821         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1822         churn,
1823         NULL);
1824 }
1825
1826
1827 /**
1828  * Initialise given RPSPeer
1829  */
1830 static void profiler_init_peer (struct RPSPeer *rps_peer)
1831 {
1832   if (num_peers - 1 == rps_peer->index)
1833     rps_peer->num_ids_to_request = cur_test_run.num_requests;
1834 }
1835
1836
1837 /**
1838  * Callback to call on receipt of a reply
1839  *
1840  * @param cls closure
1841  * @param n number of peers
1842  * @param recv_peers the received peers
1843  */
1844 static void
1845 profiler_reply_handle (void *cls,
1846                       uint64_t n,
1847                       const struct GNUNET_PeerIdentity *recv_peers)
1848 {
1849   struct RPSPeer *rps_peer;
1850   struct RPSPeer *rcv_rps_peer;
1851   char *file_name;
1852   char *file_name_dh;
1853   unsigned int i;
1854   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1855
1856   rps_peer = pending_rep->rps_peer;
1857   file_name = "/tmp/rps/received_ids";
1858   file_name_dh = "/tmp/rps/diehard_input";
1859   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1860               "[%s] got %" PRIu64 " peers:\n",
1861               GNUNET_i2s (rps_peer->peer_id),
1862               n);
1863   for (i = 0; i < n; i++)
1864   {
1865     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1866                 "%u: %s\n",
1867                 i,
1868                 GNUNET_i2s (&recv_peers[i]));
1869     tofile (file_name,
1870              "%s\n",
1871              GNUNET_i2s_full (&recv_peers[i]));
1872     rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
1873     GNUNET_assert (NULL != rcv_rps_peer);
1874     tofile (file_name_dh,
1875              "%" PRIu32 "\n",
1876              (uint32_t) rcv_rps_peer->index);
1877   }
1878   default_reply_handle (cls, n, recv_peers);
1879 }
1880
1881
1882 static void
1883 profiler_cb (struct RPSPeer *rps_peer)
1884 {
1885   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
1886   {
1887     return;
1888   }
1889
1890   /* Start churn */
1891   if (HAVE_CHURN == cur_test_run.have_churn && NULL == churn_task)
1892   {
1893     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1894                 "Starting churn task\n");
1895     churn_task = GNUNET_SCHEDULER_add_delayed (
1896           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1897           churn,
1898           NULL);
1899   } else {
1900     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1901                 "Not starting churn task\n");
1902   }
1903
1904   /* Only request peer ids at one peer.
1905    * (It's the before-last because last one is target of the focussed attack.)
1906    */
1907   if (eval_peer == rps_peer)
1908     schedule_missing_requests (rps_peer);
1909 }
1910
1911 /**
1912  * Function called from #profiler_eval with a filename.
1913  *
1914  * @param cls closure
1915  * @param filename complete filename (absolute path)
1916  * @return #GNUNET_OK to continue to iterate,
1917  *  #GNUNET_NO to stop iteration with no error,
1918  *  #GNUNET_SYSERR to abort iteration with error!
1919  */
1920 int
1921 file_name_cb (void *cls, const char *filename)
1922 {
1923   (void) cls;
1924
1925   if (NULL != strstr (filename, "sampler_el"))
1926   {
1927     struct RPS_SamplerElement *s_elem;
1928     struct GNUNET_CRYPTO_AuthKey auth_key;
1929     const char *key_char;
1930     uint32_t i;
1931
1932     key_char = filename + 20; /* Length of "/tmp/rps/sampler_el-" */
1933     tofile (filename, "--------------------------\n");
1934
1935     auth_key = string_to_auth_key (key_char);
1936     s_elem = RPS_sampler_elem_create ();
1937     RPS_sampler_elem_set (s_elem, auth_key);
1938
1939     for (i = 0; i < num_peers; i++)
1940     {
1941       RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
1942     }
1943     RPS_sampler_elem_destroy (s_elem);
1944   }
1945   return GNUNET_OK;
1946 }
1947
1948 /**
1949  * This is run after the test finished.
1950  *
1951  * Compute all perfect samples.
1952  */
1953 int
1954 profiler_eval (void)
1955 {
1956   /* Compute perfect sample for each sampler element */
1957   if (-1 == GNUNET_DISK_directory_scan ("/tmp/rps/", file_name_cb, NULL))
1958   {
1959     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n");
1960   }
1961
1962   return evaluate ();
1963 }
1964
1965 static uint32_t fac (uint32_t x)
1966 {
1967   if (1 >= x)
1968   {
1969     return x;
1970   }
1971   return x * fac (x - 1);
1972 }
1973
1974 static uint32_t binom (uint32_t n, uint32_t k)
1975 {
1976   //GNUNET_assert (n >= k);
1977   if (k > n) return 0;
1978   if (0 > n) return 0;
1979   if (0 > k) return 0;
1980   if (0 == k) return 1;
1981   return fac (n)
1982     /
1983     fac(k) * fac(n - k);
1984 }
1985
1986 /**
1987  * @brief is b in view of a?
1988  *
1989  * @param a
1990  * @param b
1991  *
1992  * @return
1993  */
1994 static int is_in_view (uint32_t a, uint32_t b)
1995 {
1996   uint32_t i;
1997   for (i = 0; i < rps_peers[a].cur_view_count; i++)
1998   {
1999     if (0 == memcmp (rps_peers[b].peer_id,
2000           &rps_peers[a].cur_view[i],
2001           sizeof (struct GNUNET_PeerIdentity)))
2002     {
2003       return GNUNET_YES;
2004     }
2005   }
2006   return GNUNET_NO;
2007 }
2008
2009 static uint32_t get_idx_of_pid (const struct GNUNET_PeerIdentity *pid)
2010 {
2011   uint32_t i;
2012
2013   for (i = 0; i < num_peers; i++)
2014   {
2015     if (0 == memcmp (pid,
2016           rps_peers[i].peer_id,
2017           sizeof (struct GNUNET_PeerIdentity)))
2018     {
2019       return i;
2020     }
2021   }
2022   //return 0; /* Should not happen - make compiler happy */
2023   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2024              "No known _PeerIdentity %s!\n",
2025              GNUNET_i2s_full (pid));
2026   GNUNET_assert (0);
2027 }
2028
2029 /**
2030  * @brief Counts number of peers in view of a that have b in their view
2031  *
2032  * @param a
2033  * @param uint32_tb
2034  *
2035  * @return
2036  */
2037 static uint32_t count_containing_views (uint32_t a, uint32_t b)
2038 {
2039   uint32_t i;
2040   uint32_t peer_idx;
2041   uint32_t count = 0;
2042
2043   for (i = 0; i < rps_peers[a].cur_view_count; i++)
2044   {
2045     peer_idx = get_idx_of_pid (&rps_peers[a].cur_view[i]);
2046     if (GNUNET_YES == is_in_view (peer_idx, b))
2047     {
2048       count++;
2049     }
2050   }
2051   return count;
2052 }
2053
2054 /**
2055  * @brief Computes the probability for each other peer to be selected by the
2056  * sampling process based on the views of all peers
2057  *
2058  * @param peer_idx index of the peer that is about to sample
2059  */
2060 static void compute_probabilities (uint32_t peer_idx)
2061 {
2062   //double probs[num_peers] = { 0 };
2063   double probs[num_peers];
2064   size_t probs_as_str_size = (num_peers * 10 + 1) * sizeof (char);
2065   char *probs_as_str = GNUNET_malloc (probs_as_str_size);
2066   char *probs_as_str_cpy;
2067   uint32_t i;
2068   double prob_push;
2069   double prob_pull;
2070   uint32_t view_size;
2071   uint32_t cont_views;
2072   uint32_t number_of_being_in_pull_events;
2073   int tmp;
2074   uint32_t count_non_zero_prob = 0;
2075
2076   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077       "Computing probabilities for peer %" PRIu32 "\n", peer_idx);
2078   /* Firstly without knowledge of old views */
2079   for (i = 0; i < num_peers; i++)
2080   {
2081     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2082         "\tfor peer %" PRIu32 ":\n", i);
2083     view_size = rps_peers[i].cur_view_count;
2084     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2085         "\t\tview_size: %" PRIu32 "\n", view_size);
2086     /* For peer i the probability of being sampled is
2087      * evenly distributed among all possibly observed peers. */
2088     /* We could have observed a peer in three cases:
2089      *   1. peer sent a push
2090      *   2. peer was contained in a pull reply
2091      *   3. peer was in history (sampler) - ignored for now */
2092     /* 1. Probability of having received a push from peer i */
2093     if ((GNUNET_YES == is_in_view (i, peer_idx)) &&
2094         (1 <= (0.45 * view_size)))
2095     {
2096       prob_push = 1.0 * binom (0.45 * view_size, 1)
2097         /
2098         binom (view_size, 0.45 * view_size);
2099       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100                  "\t\t%" PRIu32 " is in %" PRIu32 "'s view, prob: %f\n",
2101                  peer_idx,
2102                  i,
2103                  prob_push);
2104       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105                  "\t\tposs choices from view: %" PRIu32 ", containing i: %" PRIu32 "\n",
2106                  binom (view_size, 0.45 * view_size),
2107                  binom (0.45 * view_size, 1));
2108     } else {
2109       prob_push = 0;
2110       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2111                  "\t\t%" PRIu32 " is not in %" PRIu32 "'s view, prob: 0\n",
2112                  peer_idx,
2113                  i);
2114     }
2115     /* 2. Probability of peer i being contained in pulls */
2116     view_size = rps_peers[peer_idx].cur_view_count;
2117     cont_views = count_containing_views (peer_idx, i);
2118     number_of_being_in_pull_events =
2119       (binom (view_size, 0.45 * view_size) -
2120        binom (view_size - cont_views, 0.45 * view_size));
2121     if (0 != number_of_being_in_pull_events)
2122     {
2123       prob_pull = number_of_being_in_pull_events
2124         /
2125         (1.0 * binom (view_size, 0.45 * view_size));
2126     } else
2127     {
2128       prob_pull = 0;
2129     }
2130     probs[i] = prob_push + prob_pull - (prob_push * prob_pull);
2131     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2132                "\t\t%" PRIu32 " has %" PRIu32 " of %" PRIu32
2133                " peers in its view who know %" PRIu32 " prob: %f\n",
2134                peer_idx,
2135                cont_views,
2136                view_size,
2137                i,
2138                prob_pull);
2139     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2140                "\t\tnumber of possible pull combinations: %" PRIu32 "\n",
2141                binom (view_size, 0.45 * view_size));
2142     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2143                "\t\tnumber of possible pull combinations without %" PRIu32
2144                ": %" PRIu32 "\n",
2145                i,
2146                binom (view_size - cont_views, 0.45 * view_size));
2147     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2148                "\t\tnumber of possible pull combinations with %" PRIu32
2149                ": %" PRIu32 "\n",
2150                i,
2151                number_of_being_in_pull_events);
2152
2153     if (0 != probs[i]) count_non_zero_prob++;
2154   }
2155   /* normalize */
2156   if (0 != count_non_zero_prob)
2157   {
2158     for (i = 0; i < num_peers; i++)
2159     {
2160       probs[i] = probs[i] * (1.0 / count_non_zero_prob);
2161     }
2162   } else {
2163     for (i = 0; i < num_peers; i++)
2164     {
2165       probs[i] = 0;
2166     }
2167   }
2168   /* str repr */
2169   for (i = 0; i < num_peers; i++)
2170   {
2171     probs_as_str_cpy = GNUNET_strndup (probs_as_str, probs_as_str_size);
2172     tmp = GNUNET_snprintf (probs_as_str,
2173                            probs_as_str_size,
2174                            "%s %7.6f", probs_as_str_cpy, probs[i]);
2175     GNUNET_free (probs_as_str_cpy);
2176     GNUNET_assert (0 <= tmp);
2177   }
2178
2179   to_file_w_len (rps_peers[peer_idx].file_name_probs,
2180                  probs_as_str_size,
2181                  probs_as_str);
2182   GNUNET_free (probs_as_str);
2183 }
2184
2185 /**
2186  * @brief This counts the number of peers in which views a given peer occurs.
2187  *
2188  * It also stores this value in the rps peer.
2189  *
2190  * @param peer_idx the index of the peer to count the representation
2191  *
2192  * @return the number of occurrences
2193  */
2194 static uint32_t count_peer_in_views_2 (uint32_t peer_idx)
2195 {
2196   uint32_t i, j;
2197   uint32_t count = 0;
2198
2199   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
2200   {
2201     for (j = 0; j < rps_peers[i].cur_view_count; j++) /* entry in view */
2202     {
2203       if (0 == memcmp (rps_peers[peer_idx].peer_id,
2204             &rps_peers[i].cur_view[j],
2205             sizeof (struct GNUNET_PeerIdentity)))
2206       {
2207         count++;
2208         break;
2209       }
2210     }
2211   }
2212   rps_peers[peer_idx].count_in_views = count;
2213   return count;
2214 }
2215
2216 static uint32_t cumulated_view_sizes ()
2217 {
2218   uint32_t i;
2219
2220   view_sizes = 0;
2221   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
2222   {
2223     view_sizes += rps_peers[i].cur_view_count;
2224   }
2225   return view_sizes;
2226 }
2227
2228 static void count_peer_in_views (uint32_t *count_peers)
2229 {
2230   uint32_t i, j;
2231
2232   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
2233   {
2234     for (j = 0; j < rps_peers[i].cur_view_count; j++) /* entry in view */
2235     {
2236       if (0 == memcmp (rps_peers[i].peer_id,
2237             &rps_peers[i].cur_view[j],
2238             sizeof (struct GNUNET_PeerIdentity)))
2239       {
2240         count_peers[i]++;
2241       }
2242     }
2243   }
2244 }
2245
2246 void compute_diversity ()
2247 {
2248   uint32_t i;
2249   /* ith entry represents the numer of occurrences in other peer's views */
2250   uint32_t *count_peers = GNUNET_new_array (num_peers, uint32_t);
2251   uint32_t views_total_size;
2252   double expected;
2253   /* deviation from expected number of peers */
2254   double *deviation = GNUNET_new_array (num_peers, double);
2255
2256   views_total_size = 0;
2257   expected = 0;
2258
2259   /* For each peer count its representation in other peer's views*/
2260   for (i = 0; i < num_peers; i++) /* Peer to count */
2261   {
2262     views_total_size += rps_peers[i].cur_view_count;
2263     count_peer_in_views (count_peers);
2264     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2265                "Counted representation of %" PRIu32 "th peer [%s]: %" PRIu32"\n",
2266                i,
2267                GNUNET_i2s (rps_peers[i].peer_id),
2268                count_peers[i]);
2269   }
2270
2271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2272              "size of all views combined: %" PRIu32 "\n",
2273              views_total_size);
2274   expected = ((double) 1/num_peers) * views_total_size;
2275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2276              "Expected number of occurrences of each peer in all views: %f\n",
2277              expected);
2278   for (i = 0; i < num_peers; i++) /* Peer to count */
2279   {
2280     deviation[i] = expected - count_peers[i];
2281     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2282                "Deviation from expectation: %f\n", deviation[i]);
2283   }
2284   GNUNET_free (count_peers);
2285   GNUNET_free (deviation);
2286 }
2287
2288 void print_view_sizes()
2289 {
2290   uint32_t i;
2291
2292   for (i = 0; i < num_peers; i++) /* Peer to count */
2293   {
2294     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2295                "View size of %" PRIu32 ". [%s] is %" PRIu32 "\n",
2296                i,
2297                GNUNET_i2s (rps_peers[i].peer_id),
2298                rps_peers[i].cur_view_count);
2299   }
2300 }
2301
2302 void all_views_updated_cb()
2303 {
2304   compute_diversity();
2305   print_view_sizes();
2306 }
2307
2308 void view_update_cb (void *cls,
2309                      uint64_t view_size,
2310                      const struct GNUNET_PeerIdentity *peers)
2311 {
2312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2313               "View was updated (%" PRIu64 ")\n", view_size);
2314   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
2315   to_file ("/tmp/rps/view_sizes.txt",
2316          "%" PRIu64 " %" PRIu32 "",
2317          rps_peer->index,
2318          view_size);
2319   for (int i = 0; i < view_size; i++)
2320   {
2321     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2322                "\t%s\n", GNUNET_i2s (&peers[i]));
2323   }
2324   GNUNET_array_grow (rps_peer->cur_view,
2325                      rps_peer->cur_view_count,
2326                      view_size);
2327   //*rps_peer->cur_view = *peers;
2328   GNUNET_memcpy (rps_peer->cur_view,
2329                  peers,
2330                  view_size * sizeof (struct GNUNET_PeerIdentity));
2331   to_file ("/tmp/rps/count_in_views.txt",
2332          "%" PRIu64 " %" PRIu32 "",
2333          rps_peer->index,
2334          count_peer_in_views_2 (rps_peer->index));
2335   cumulated_view_sizes();
2336   if (0 != view_size)
2337   {
2338     to_file ("/tmp/rps/repr.txt",
2339            "%" PRIu64 /* index */
2340            " %" PRIu32 /* occurrence in views */
2341            " %" PRIu32 /* view sizes */
2342            " %f" /* fraction of repr in views */
2343            " %f" /* average view size */
2344            " %f" /* prob of occurrence in view slot */
2345            " %f" "", /* exp frac of repr in views */
2346            rps_peer->index,
2347            count_peer_in_views_2 (rps_peer->index),
2348            view_sizes,
2349            count_peer_in_views_2 (rps_peer->index) / (view_size * 1.0), /* fraction of representation in views */
2350            view_sizes / (view_size * 1.0), /* average view size */
2351            1.0 /view_size, /* prob of occurrence in view slot */
2352            (1.0/view_size) * (view_sizes/view_size) /* expected fraction of repr in views */
2353            );
2354   }
2355   compute_probabilities (rps_peer->index);
2356   all_views_updated_cb();
2357 }
2358
2359 static void
2360 pre_profiler (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
2361 {
2362   rps_peer->file_name_probs =
2363     store_prefix_file_name (rps_peer->peer_id, "probs");
2364   GNUNET_RPS_view_request (h, 0, view_update_cb, rps_peer);
2365 }
2366
2367 void write_final_stats (void){
2368   uint32_t i;
2369
2370   for (i = 0; i < num_peers; i++)
2371   {
2372     to_file ("/tmp/rps/final_stats.dat",
2373              "%" PRIu32 " " /* index */
2374              "%s %" /* id */
2375              PRIu64 " %" /* rounds */
2376              PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" /* blocking */
2377              PRIu64 " %" PRIu64 " %" PRIu64 " %" /* issued */
2378              PRIu64 " %" PRIu64 " %" PRIu64 " %" /* sent */
2379              PRIu64 " %" PRIu64 " %" PRIu64 /* recv */,
2380              i,
2381              GNUNET_i2s (rps_peers[i].peer_id),
2382              rps_peers[i].num_rounds,
2383              rps_peers[i].num_blocks,
2384              rps_peers[i].num_blocks_many_push,
2385              rps_peers[i].num_blocks_no_push,
2386              rps_peers[i].num_blocks_no_pull,
2387              rps_peers[i].num_blocks_many_push_no_pull,
2388              rps_peers[i].num_blocks_no_push_no_pull,
2389              rps_peers[i].num_issued_push,
2390              rps_peers[i].num_issued_pull_req,
2391              rps_peers[i].num_issued_pull_rep,
2392              rps_peers[i].num_sent_push,
2393              rps_peers[i].num_sent_pull_req,
2394              rps_peers[i].num_sent_pull_rep,
2395              rps_peers[i].num_recv_push,
2396              rps_peers[i].num_recv_pull_req,
2397              rps_peers[i].num_recv_pull_rep);
2398   }
2399 }
2400
2401 /**
2402  * Continuation called by #GNUNET_STATISTICS_get() functions.
2403  *
2404  * Remembers that this specific statistics value was received for this peer.
2405  * Checks whether all peers received their statistics yet.
2406  * Issues the shutdown.
2407  *
2408  * @param cls closure
2409  * @param success #GNUNET_OK if statistics were
2410  *        successfully obtained, #GNUNET_SYSERR if not.
2411  */
2412 void
2413 post_test_shutdown_ready_cb (void *cls,
2414                              int success)
2415 {
2416   struct STATcls *stat_cls = (struct STATcls *) cls;
2417   struct RPSPeer *rps_peer = stat_cls->rps_peer;
2418   if (GNUNET_OK == success)
2419   {
2420     /* set flag that we we got the value */
2421     rps_peer->stat_collected_flags |= stat_cls->stat_type;
2422   } else {
2423     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2424         "Peer %u did not receive statistics value\n",
2425         rps_peer->index);
2426     GNUNET_free (stat_cls);
2427     GNUNET_break (0);
2428   }
2429
2430   if (NULL != rps_peer->stat_op &&
2431       GNUNET_YES == check_statistics_collect_completed_single_peer (rps_peer))
2432   {
2433     GNUNET_TESTBED_operation_done (rps_peer->stat_op);
2434   }
2435
2436   write_final_stats ();
2437   if (GNUNET_YES == check_statistics_collect_completed())
2438   {
2439     //write_final_stats ();
2440     GNUNET_free (stat_cls);
2441     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2442         "Shutting down\n");
2443     GNUNET_SCHEDULER_shutdown ();
2444   } else {
2445     GNUNET_free (stat_cls);
2446   }
2447 }
2448
2449 /**
2450  * @brief Converts string representation to the corresponding #STAT_TYPE enum.
2451  *
2452  * @param stat_str string representation of statistics specifier
2453  *
2454  * @return corresponding enum
2455  */
2456 enum STAT_TYPE stat_str_2_type (const char *stat_str)
2457 {
2458   if (0 == strncmp ("# rounds blocked - no pull replies", stat_str, strlen ("# rounds blocked - no pull replies")))
2459   {
2460     return STAT_TYPE_BLOCKS_NO_PULL;
2461   }
2462   else if (0 == strncmp ("# rounds blocked - too many pushes, no pull replies", stat_str, strlen ("# rounds blocked - too many pushes, no pull replies")))
2463   {
2464     return STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL;
2465   }
2466   else if (0 == strncmp ("# rounds blocked - too many pushes", stat_str, strlen ("# rounds blocked - too many pushes")))
2467   {
2468     return STAT_TYPE_BLOCKS_MANY_PUSH;
2469   }
2470   else if (0 == strncmp ("# rounds blocked - no pushes, no pull replies", stat_str, strlen ("# rounds blocked - no pushes, no pull replies")))
2471   {
2472     return STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL;
2473   }
2474   else if (0 == strncmp ("# rounds blocked - no pushes", stat_str, strlen ("# rounds blocked - no pushes")))
2475   {
2476     return STAT_TYPE_BLOCKS_NO_PUSH;
2477   }
2478   else if (0 == strncmp ("# rounds blocked", stat_str, strlen ("# rounds blocked")))
2479   {
2480     return STAT_TYPE_BLOCKS;
2481   }
2482   else if (0 == strncmp ("# rounds", stat_str, strlen ("# rounds")))
2483   {
2484     return STAT_TYPE_ROUNDS;
2485   }
2486   else if (0 == strncmp ("# push send issued", stat_str, strlen ("# push send issued")))
2487   {
2488     return STAT_TYPE_ISSUED_PUSH_SEND;
2489   }
2490   else if (0 == strncmp ("# pull request send issued", stat_str, strlen ("# pull request send issued")))
2491   {
2492     return STAT_TYPE_ISSUED_PULL_REQ;
2493   }
2494   else if (0 == strncmp ("# pull reply send issued", stat_str, strlen ("# pull reply send issued")))
2495   {
2496     return STAT_TYPE_ISSUED_PULL_REP;
2497   }
2498   else if (0 == strncmp ("# pushes sent", stat_str, strlen ("# pushes sent")))
2499   {
2500     return STAT_TYPE_SENT_PUSH_SEND;
2501   }
2502   else if (0 == strncmp ("# pull requests sent", stat_str, strlen ("# pull requests sent")))
2503   {
2504     return STAT_TYPE_SENT_PULL_REQ;
2505   }
2506   else if (0 == strncmp ("# pull replys sent", stat_str, strlen ("# pull replys sent")))
2507   {
2508     return STAT_TYPE_SENT_PULL_REP;
2509   }
2510   else if (0 == strncmp ("# push message received", stat_str, strlen ("# push message received")))
2511   {
2512     return STAT_TYPE_RECV_PUSH_SEND;
2513   }
2514   else if (0 == strncmp ("# pull request message received", stat_str, strlen ("# pull request message received")))
2515   {
2516     return STAT_TYPE_RECV_PULL_REQ;
2517   }
2518   else if (0 == strncmp ("# pull reply messages received", stat_str, strlen ("# pull reply messages received")))
2519   {
2520     return STAT_TYPE_RECV_PULL_REP;
2521   }
2522   return STAT_TYPE_MAX;
2523 }
2524
2525
2526 /**
2527  * @brief Converts #STAT_TYPE enum to the equivalent string representation that
2528  * is stored with the statistics service.
2529  *
2530  * @param stat_type #STAT_TYPE enum
2531  *
2532  * @return string representation that matches statistics value
2533  */
2534 char* stat_type_2_str (enum STAT_TYPE stat_type)
2535 {
2536   switch (stat_type)
2537   {
2538     case STAT_TYPE_ROUNDS:
2539       return "# rounds";
2540     case STAT_TYPE_BLOCKS:
2541       return "# rounds blocked";
2542     case STAT_TYPE_BLOCKS_MANY_PUSH:
2543       return "# rounds blocked - too many pushes";
2544     case STAT_TYPE_BLOCKS_NO_PUSH:
2545       return "# rounds blocked - no pushes";
2546     case STAT_TYPE_BLOCKS_NO_PULL:
2547       return "# rounds blocked - no pull replies";
2548     case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
2549       return "# rounds blocked - too many pushes, no pull replies";
2550     case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
2551       return "# rounds blocked - no pushes, no pull replies";
2552     case STAT_TYPE_ISSUED_PUSH_SEND:
2553       return "# push send issued";
2554     case STAT_TYPE_ISSUED_PULL_REQ:
2555       return "# pull request send issued";
2556     case STAT_TYPE_ISSUED_PULL_REP:
2557       return "# pull reply send issued";
2558     case STAT_TYPE_SENT_PUSH_SEND:
2559       return "# pushes sent";
2560     case STAT_TYPE_SENT_PULL_REQ:
2561       return "# pull requests sent";
2562     case STAT_TYPE_SENT_PULL_REP:
2563       return "# pull replys sent";
2564     case STAT_TYPE_RECV_PUSH_SEND:
2565       return "# push message received";
2566     case STAT_TYPE_RECV_PULL_REQ:
2567       return "# pull request message received";
2568     case STAT_TYPE_RECV_PULL_REP:
2569       return "# pull reply messages received";
2570     case STAT_TYPE_MAX:
2571     default:
2572       return "ERROR";
2573       ;
2574   }
2575 }
2576
2577 /**
2578  * Callback function to process statistic values.
2579  *
2580  * @param cls closure
2581  * @param subsystem name of subsystem that created the statistic
2582  * @param name the name of the datum
2583  * @param value the current value
2584  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
2585  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
2586  */
2587 int
2588 stat_iterator (void *cls,
2589                const char *subsystem,
2590                const char *name,
2591                uint64_t value,
2592                int is_persistent)
2593 {
2594   (void) subsystem;
2595   (void) is_persistent;
2596   const struct STATcls *stat_cls = (const struct STATcls *) cls;
2597   struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
2598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
2599       //stat_type_2_str (stat_cls->stat_type),
2600       name,
2601       value);
2602   to_file (rps_peer->file_name_stats,
2603           "%s: %" PRIu64 "\n",
2604           name,
2605           value);
2606   switch (stat_str_2_type (name))
2607   {
2608     case STAT_TYPE_ROUNDS:
2609       rps_peer->num_rounds = value;
2610       break;
2611     case STAT_TYPE_BLOCKS:
2612       rps_peer->num_blocks = value;
2613       break;
2614     case STAT_TYPE_BLOCKS_MANY_PUSH:
2615       rps_peer->num_blocks_many_push = value;
2616       break;
2617     case STAT_TYPE_BLOCKS_NO_PUSH:
2618       rps_peer->num_blocks_no_push = value;
2619       break;
2620     case STAT_TYPE_BLOCKS_NO_PULL:
2621       rps_peer->num_blocks_no_pull = value;
2622       break;
2623     case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
2624       rps_peer->num_blocks_many_push_no_pull = value;
2625       break;
2626     case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
2627       rps_peer->num_blocks_no_push_no_pull = value;
2628       break;
2629     case STAT_TYPE_ISSUED_PUSH_SEND:
2630       rps_peer->num_issued_push = value;
2631       break;
2632     case STAT_TYPE_ISSUED_PULL_REQ:
2633       rps_peer->num_issued_pull_req = value;
2634       break;
2635     case STAT_TYPE_ISSUED_PULL_REP:
2636       rps_peer->num_issued_pull_rep = value;
2637       break;
2638     case STAT_TYPE_SENT_PUSH_SEND:
2639       rps_peer->num_sent_push = value;
2640       break;
2641     case STAT_TYPE_SENT_PULL_REQ:
2642       rps_peer->num_sent_pull_req = value;
2643       break;
2644     case STAT_TYPE_SENT_PULL_REP:
2645       rps_peer->num_sent_pull_rep = value;
2646       break;
2647     case STAT_TYPE_RECV_PUSH_SEND:
2648       rps_peer->num_recv_push = value;
2649       break;
2650     case STAT_TYPE_RECV_PULL_REQ:
2651       rps_peer->num_recv_pull_req = value;
2652       break;
2653     case STAT_TYPE_RECV_PULL_REP:
2654       rps_peer->num_recv_pull_rep = value;
2655       break;
2656     case STAT_TYPE_MAX:
2657     default:
2658       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2659                  "Unknown statistics string: %s\n",
2660                  name);
2661       break;
2662   }
2663   return GNUNET_OK;
2664 }
2665
2666 void post_profiler (struct RPSPeer *rps_peer)
2667 {
2668   if (COLLECT_STATISTICS != cur_test_run.have_collect_statistics)
2669   {
2670     return;
2671   }
2672
2673   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2674       "Going to request statistic values with mask 0x%" PRIx32 "\n",
2675       cur_test_run.stat_collect_flags);
2676
2677   struct STATcls *stat_cls;
2678   uint32_t stat_type;
2679   for (stat_type = STAT_TYPE_ROUNDS;
2680       stat_type < STAT_TYPE_MAX;
2681       stat_type = stat_type <<1)
2682   {
2683     if (stat_type & cur_test_run.stat_collect_flags)
2684     {
2685       stat_cls = GNUNET_malloc (sizeof (struct STATcls));
2686       stat_cls->rps_peer = rps_peer;
2687       stat_cls->stat_type = stat_type;
2688       rps_peer->file_name_stats =
2689         store_prefix_file_name (rps_peer->peer_id, "stats");
2690       GNUNET_STATISTICS_get (rps_peer->stats_h,
2691                              "rps",
2692                              stat_type_2_str (stat_type),
2693                              post_test_shutdown_ready_cb,
2694                              stat_iterator,
2695                              (struct STATcls *) stat_cls);
2696       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2697           "Requested statistics for %s (peer %" PRIu32 ")\n",
2698           stat_type_2_str (stat_type),
2699           rps_peer->index);
2700     }
2701   }
2702 }
2703
2704
2705 /***********************************************************************
2706  * /Definition of tests
2707 ***********************************************************************/
2708
2709
2710 /**
2711  * Actual "main" function for the testcase.
2712  *
2713  * @param cls closure
2714  * @param h the run handle
2715  * @param n_peers number of peers in 'peers'
2716  * @param peers handle to peers run in the testbed
2717  * @param links_succeeded the number of overlay link connection attempts that
2718  *          succeeded
2719  * @param links_failed the number of overlay link connection attempts that
2720  *          failed
2721  */
2722 static void
2723 run (void *cls,
2724      struct GNUNET_TESTBED_RunHandle *h,
2725      unsigned int n_peers,
2726      struct GNUNET_TESTBED_Peer **peers,
2727      unsigned int links_succeeded,
2728      unsigned int links_failed)
2729 {
2730   (void) cls;
2731   (void) h;
2732   (void) links_failed;
2733   unsigned int i;
2734   struct OpListEntry *entry;
2735
2736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "RUN was called\n");
2737
2738   /* Check whether we timed out */
2739   if (n_peers != num_peers ||
2740       NULL == peers ||
2741       0 == links_succeeded)
2742   {
2743     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Going down due to args (eg. timeout)\n");
2744     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tn_peers: %u\n", n_peers);
2745     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tnum_peers: %" PRIu32 "\n", num_peers);
2746     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tpeers: %p\n", peers);
2747     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tlinks_succeeded: %u\n", links_succeeded);
2748     GNUNET_SCHEDULER_shutdown ();
2749     return;
2750   }
2751
2752
2753   /* Initialize peers */
2754   testbed_peers = peers;
2755   num_peers_online = 0;
2756   for (i = 0; i < num_peers; i++)
2757   {
2758     entry = make_oplist_entry ();
2759     entry->index = i;
2760     rps_peers[i].index = i;
2761     if (NULL != cur_test_run.init_peer)
2762       cur_test_run.init_peer (&rps_peers[i]);
2763     if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
2764     {
2765       rps_peers->cur_view_count = 0;
2766       rps_peers->cur_view = NULL;
2767     }
2768     entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
2769                                                      GNUNET_TESTBED_PIT_IDENTITY,
2770                                                      &info_cb,
2771                                                      entry);
2772   }
2773
2774   /* Bring peers up */
2775   GNUNET_assert (num_peers == n_peers);
2776   for (i = 0; i < n_peers; i++)
2777   {
2778     rps_peers[i].index = i;
2779     rps_peers[i].op =
2780       GNUNET_TESTBED_service_connect (&rps_peers[i],
2781                                       peers[i],
2782                                       "rps",
2783                                       &rps_connect_complete_cb,
2784                                       &rps_peers[i],
2785                                       &rps_connect_adapter,
2786                                       &rps_disconnect_adapter,
2787                                       &rps_peers[i]);
2788     /* Connect all peers to statistics service */
2789     if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
2790     {
2791       rps_peers[i].stat_op =
2792         GNUNET_TESTBED_service_connect (NULL,
2793                                         peers[i],
2794                                         "statistics",
2795                                         stat_complete_cb,
2796                                         &rps_peers[i],
2797                                         &stat_connect_adapter,
2798                                         &stat_disconnect_adapter,
2799                                         &rps_peers[i]);
2800     }
2801   }
2802
2803   if (NULL != churn_task)
2804     GNUNET_SCHEDULER_cancel (churn_task);
2805   post_test_task = GNUNET_SCHEDULER_add_delayed (timeout, &post_test_op, NULL);
2806   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
2807       (timeout_s * 1.2) + 0.1 * num_peers);
2808   shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
2809 }
2810
2811
2812 /**
2813  * Entry point for the testcase, sets up the testbed.
2814  *
2815  * @param argc unused
2816  * @param argv unused
2817  * @return 0 on success
2818  */
2819 int
2820 main (int argc, char *argv[])
2821 {
2822   int ret_value;
2823   (void) argc;
2824
2825   /* Defaults for tests */
2826   num_peers = 5;
2827   cur_test_run.name = "test-rps-default";
2828   cur_test_run.init_peer = default_init_peer;
2829   cur_test_run.pre_test = NULL;
2830   cur_test_run.reply_handle = default_reply_handle;
2831   cur_test_run.eval_cb = default_eval_cb;
2832   cur_test_run.post_test = NULL;
2833   cur_test_run.have_churn = HAVE_CHURN;
2834   cur_test_run.have_collect_statistics = NO_COLLECT_STATISTICS;
2835   cur_test_run.stat_collect_flags = 0;
2836   cur_test_run.have_collect_view = NO_COLLECT_VIEW;
2837   churn_task = NULL;
2838   timeout_s = 30;
2839
2840   if (strstr (argv[0], "malicious") != NULL)
2841   {
2842     cur_test_run.pre_test = mal_pre;
2843     cur_test_run.main_test = mal_cb;
2844     cur_test_run.init_peer = mal_init_peer;
2845     timeout_s = 40;
2846
2847     if (strstr (argv[0], "_1") != NULL)
2848     {
2849       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 1\n");
2850       cur_test_run.name = "test-rps-malicious_1";
2851       mal_type = 1;
2852     }
2853     else if (strstr (argv[0], "_2") != NULL)
2854     {
2855       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 2\n");
2856       cur_test_run.name = "test-rps-malicious_2";
2857       mal_type = 2;
2858     }
2859     else if (strstr (argv[0], "_3") != NULL)
2860     {
2861       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 3\n");
2862       cur_test_run.name = "test-rps-malicious_3";
2863       mal_type = 3;
2864     }
2865   }
2866
2867   else if (strstr (argv[0], "_single_req") != NULL)
2868   {
2869     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test single request\n");
2870     cur_test_run.name = "test-rps-single-req";
2871     cur_test_run.main_test = single_req_cb;
2872     cur_test_run.have_churn = HAVE_NO_CHURN;
2873   }
2874
2875   else if (strstr (argv[0], "_delayed_reqs") != NULL)
2876   {
2877     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test delayed requests\n");
2878     cur_test_run.name = "test-rps-delayed-reqs";
2879     cur_test_run.main_test = delay_req_cb;
2880     cur_test_run.have_churn = HAVE_NO_CHURN;
2881   }
2882
2883   else if (strstr (argv[0], "_seed_big") != NULL)
2884   {
2885     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding (num_peers > GNUNET_MAX_MESSAGE_SIZE)\n");
2886     num_peers = 1;
2887     cur_test_run.name = "test-rps-seed-big";
2888     cur_test_run.main_test = seed_big_cb;
2889     cur_test_run.eval_cb = no_eval;
2890     cur_test_run.have_churn = HAVE_NO_CHURN;
2891     timeout_s = 10;
2892   }
2893
2894   else if (strstr (argv[0], "_single_peer_seed") != NULL)
2895   {
2896     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on a single peer\n");
2897     cur_test_run.name = "test-rps-single-peer-seed";
2898     cur_test_run.main_test = single_peer_seed_cb;
2899     cur_test_run.have_churn = HAVE_NO_CHURN;
2900   }
2901
2902   else if (strstr (argv[0], "_seed_request") != NULL)
2903   {
2904     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on multiple peers\n");
2905     cur_test_run.name = "test-rps-seed-request";
2906     cur_test_run.main_test = seed_req_cb;
2907     cur_test_run.have_churn = HAVE_NO_CHURN;
2908   }
2909
2910   else if (strstr (argv[0], "_seed") != NULL)
2911   {
2912     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding\n");
2913     cur_test_run.name = "test-rps-seed";
2914     cur_test_run.main_test = seed_cb;
2915     cur_test_run.eval_cb = no_eval;
2916     cur_test_run.have_churn = HAVE_NO_CHURN;
2917   }
2918
2919   else if (strstr (argv[0], "_req_cancel") != NULL)
2920   {
2921     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
2922     cur_test_run.name = "test-rps-req-cancel";
2923     num_peers = 1;
2924     cur_test_run.main_test = req_cancel_cb;
2925     cur_test_run.eval_cb = no_eval;
2926     cur_test_run.have_churn = HAVE_NO_CHURN;
2927     timeout_s = 10;
2928   }
2929
2930   else if (strstr (argv[0], "_churn") != NULL)
2931   {
2932     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test churn\n");
2933     cur_test_run.name = "test-rps-churn";
2934     num_peers = 5;
2935     cur_test_run.init_peer = default_init_peer;
2936     cur_test_run.main_test = churn_test_cb;
2937     cur_test_run.reply_handle = default_reply_handle;
2938     cur_test_run.eval_cb = default_eval_cb;
2939     cur_test_run.have_churn = HAVE_NO_CHURN;
2940     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
2941     timeout_s = 40;
2942   }
2943
2944   else if (strstr (argv[0], "_sub") != NULL)
2945   {
2946     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test subs\n");
2947     cur_test_run.name = "test-rps-sub";
2948     num_peers = 5;
2949     //cur_test_run.init_peer = &default_init_peer;
2950     cur_test_run.pre_test = &sub_pre;
2951     cur_test_run.main_test = &single_req_cb;
2952     //cur_test_run.reply_handle = default_reply_handle;
2953     cur_test_run.post_test = &sub_post;
2954     //cur_test_run.eval_cb = default_eval_cb;
2955     cur_test_run.have_churn = HAVE_NO_CHURN;
2956     cur_test_run.have_quick_quit = HAVE_QUICK_QUIT;
2957   }
2958
2959   else if (strstr (argv[0], "profiler") != NULL)
2960   {
2961     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
2962     cur_test_run.name = "test-rps-profiler";
2963     num_peers = 16;
2964     mal_type = 3;
2965     cur_test_run.init_peer = profiler_init_peer;
2966     //cur_test_run.pre_test = mal_pre;
2967     cur_test_run.pre_test = pre_profiler;
2968     cur_test_run.main_test = profiler_cb;
2969     cur_test_run.reply_handle = profiler_reply_handle;
2970     cur_test_run.eval_cb = profiler_eval;
2971     cur_test_run.post_test = post_profiler;
2972     cur_test_run.request_interval = 2;
2973     cur_test_run.num_requests = 5;
2974     //cur_test_run.have_churn = HAVE_CHURN;
2975     cur_test_run.have_churn = HAVE_NO_CHURN;
2976     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
2977     cur_test_run.have_collect_statistics = COLLECT_STATISTICS;
2978     cur_test_run.stat_collect_flags = STAT_TYPE_ROUNDS |
2979                                       STAT_TYPE_BLOCKS |
2980                                       STAT_TYPE_BLOCKS_MANY_PUSH |
2981                                       STAT_TYPE_BLOCKS_NO_PUSH |
2982                                       STAT_TYPE_BLOCKS_NO_PULL |
2983                                       STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL |
2984                                       STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL |
2985                                       STAT_TYPE_ISSUED_PUSH_SEND |
2986                                       STAT_TYPE_ISSUED_PULL_REQ |
2987                                       STAT_TYPE_ISSUED_PULL_REP |
2988                                       STAT_TYPE_SENT_PUSH_SEND |
2989                                       STAT_TYPE_SENT_PULL_REQ |
2990                                       STAT_TYPE_SENT_PULL_REP |
2991                                       STAT_TYPE_RECV_PUSH_SEND |
2992                                       STAT_TYPE_RECV_PULL_REQ |
2993                                       STAT_TYPE_RECV_PULL_REP;
2994     cur_test_run.have_collect_view = COLLECT_VIEW;
2995     timeout_s = 150;
2996
2997     /* 'Clean' directory */
2998     (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
2999     GNUNET_DISK_directory_create ("/tmp/rps/");
3000   }
3001   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, timeout_s);
3002
3003   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
3004   peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
3005   rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
3006   if ( (2 == mal_type) ||
3007        (3 == mal_type))
3008     target_peer = &rps_peer_ids[num_peers - 2];
3009   if (profiler_eval == cur_test_run.eval_cb)
3010     eval_peer = &rps_peers[num_peers - 1];    /* FIXME: eval_peer could be a
3011                                                  malicious peer if not careful
3012                                                  with the malicious portion */
3013
3014   ok = 1;
3015   ret_value = GNUNET_TESTBED_test_run (cur_test_run.name,
3016                                        "test_rps.conf",
3017                                        num_peers,
3018                                        0, NULL, NULL,
3019                                        &run, NULL);
3020   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3021               "_test_run returned.\n");
3022   if (GNUNET_OK != ret_value)
3023   {
3024     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3025                 "Test did not run successfully!\n");
3026   }
3027
3028   ret_value = cur_test_run.eval_cb();
3029
3030   if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
3031   {
3032     GNUNET_array_grow (rps_peers->cur_view,
3033                        rps_peers->cur_view_count,
3034                        0);
3035   }
3036   GNUNET_free (rps_peers);
3037   GNUNET_free (rps_peer_ids);
3038   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
3039   return ret_value;
3040 }
3041
3042
3043 /* end of test_rps.c */