remove 'illegal' (non-reentrant) log logic from signal handler
[oweals/gnunet.git] / src / rps / test_rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2012 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file rps/test_rps.c
22  * @brief Testcase for the random peer sampling service.  Starts
23  *        a peergroup with a given number of peers, then waits to
24  *        receive size pushes/pulls from each peer.  Expects to wait
25  *        for one message from each peer.
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_testbed_service.h"
30
31 #include "gnunet_rps_service.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler_elem.h"
34
35 #include <inttypes.h>
36
37
38 /**
39  * How many peers do we start?
40  */
41 static uint32_t num_peers;
42
43 /**
44  * How long do we run the test?
45  * In seconds.
46  */
47 static uint32_t timeout_s;
48
49 /**
50  * How long do we run the test?
51  */
52 // #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
53 static struct GNUNET_TIME_Relative timeout;
54
55
56 /**
57  * Portion of malicious peers
58  */
59 static double portion = .1;
60
61 /**
62  * Type of malicious peer to test
63  */
64 static unsigned int mal_type = 0;
65
66 /**
67  * Handles to all of the running peers
68  */
69 static struct GNUNET_TESTBED_Peer **testbed_peers;
70
71 /**
72  * @brief Indicates whether peer should go off- or online
73  */
74 enum PEER_ONLINE_DELTA
75 {
76   /**
77    * @brief Indicates peer going online
78    */
79   PEER_GO_ONLINE = 1,
80   /**
81    * @brief Indicates peer going offline
82    */
83   PEER_GO_OFFLINE = -1,
84 };
85
86 /**
87  * Operation map entry
88  */
89 struct OpListEntry
90 {
91   /**
92    * DLL next ptr
93    */
94   struct OpListEntry *next;
95
96   /**
97    * DLL prev ptr
98    */
99   struct OpListEntry *prev;
100
101   /**
102    * The testbed operation
103    */
104   struct GNUNET_TESTBED_Operation *op;
105
106   /**
107    * Depending on whether we start or stop RPS service at the peer, set this to
108    * #PEER_GO_ONLINE (1) or #PEER_GO_OFFLINE (-1)
109    */
110   enum PEER_ONLINE_DELTA delta;
111
112   /**
113    * Index of the regarding peer
114    */
115   unsigned int index;
116 };
117
118 /**
119  * OpList DLL head
120  */
121 static struct OpListEntry *oplist_head;
122
123 /**
124  * OpList DLL tail
125  */
126 static struct OpListEntry *oplist_tail;
127
128
129 /**
130  * A pending reply: A request was sent and the reply is pending.
131  */
132 struct PendingReply
133 {
134   /**
135    * DLL next,prev ptr
136    */
137   struct PendingReply *next;
138   struct PendingReply *prev;
139
140   /**
141    * Handle to the request we are waiting for
142    */
143   struct GNUNET_RPS_Request_Handle *req_handle;
144
145   /**
146    * The peer that requested
147    */
148   struct RPSPeer *rps_peer;
149 };
150
151
152 /**
153  * A pending request: A request was not made yet but is scheduled for later.
154  */
155 struct PendingRequest
156 {
157   /**
158    * DLL next,prev ptr
159    */
160   struct PendingRequest *next;
161   struct PendingRequest *prev;
162
163   /**
164    * Handle to the request we are waiting for
165    */
166   struct GNUNET_SCHEDULER_Task *request_task;
167
168   /**
169    * The peer that requested
170    */
171   struct RPSPeer *rps_peer;
172 };
173
174
175 /**
176  * Information we track for each peer.
177  */
178 struct RPSPeer
179 {
180   /**
181    * Index of the peer.
182    */
183   unsigned int index;
184
185   /**
186    * Handle for RPS connect operation.
187    */
188   struct GNUNET_TESTBED_Operation *op;
189
190   /**
191    * Handle to RPS service.
192    */
193   struct GNUNET_RPS_Handle *rps_handle;
194
195   /**
196    * Handle to stream requests
197    */
198   struct GNUNET_RPS_StreamRequestHandle *rps_srh;
199
200   /**
201    * ID of the peer.
202    */
203   struct GNUNET_PeerIdentity *peer_id;
204
205   /**
206    * A request handle to check for an request
207    */
208   // struct GNUNET_RPS_Request_Handle *req_handle;
209
210   /**
211    * Peer on- or offline?
212    */
213   int online;
214
215   /**
216    * Number of Peer IDs to request during the whole test
217    */
218   unsigned int num_ids_to_request;
219
220   /**
221    * Pending requests DLL
222    */
223   struct PendingRequest *pending_req_head;
224   struct PendingRequest *pending_req_tail;
225
226   /**
227    * Number of pending requests
228    */
229   unsigned int num_pending_reqs;
230
231   /**
232    * Pending replies DLL
233    */
234   struct PendingReply *pending_rep_head;
235   struct PendingReply *pending_rep_tail;
236
237   /**
238    * Number of pending replies
239    */
240   unsigned int num_pending_reps;
241
242   /**
243    * Number of received PeerIDs
244    */
245   unsigned int num_recv_ids;
246
247   /**
248    * Pending operation on that peer
249    */
250   const struct OpListEntry *entry_op_manage;
251
252   /**
253    * Testbed operation to connect to statistics service
254    */
255   struct GNUNET_TESTBED_Operation *stat_op;
256
257   /**
258    * Handle to the statistics service
259    */
260   struct GNUNET_STATISTICS_Handle *stats_h;
261
262   /**
263    * @brief flags to indicate which statistics values have been already
264    * collected from the statistics service.
265    * Used to check whether we are able to shutdown.
266    */
267   uint32_t stat_collected_flags;
268
269   /**
270    * @brief File name of the file the stats are finally written to
271    */
272   const char *file_name_stats;
273
274   /**
275    * @brief File name of the file the stats are finally written to
276    */
277   const char *file_name_probs;
278
279   /**
280    * @brief The current view
281    */
282   struct GNUNET_PeerIdentity *cur_view;
283
284   /**
285    * @brief Number of peers in the #cur_view.
286    */
287   uint32_t cur_view_count;
288
289   /**
290    * @brief Number of occurrences in other peer's view
291    */
292   uint32_t count_in_views;
293
294   /**
295    * @brief statistics values
296    */
297   uint64_t num_rounds;
298   uint64_t num_blocks;
299   uint64_t num_blocks_many_push;
300   uint64_t num_blocks_no_push;
301   uint64_t num_blocks_no_pull;
302   uint64_t num_blocks_many_push_no_pull;
303   uint64_t num_blocks_no_push_no_pull;
304   uint64_t num_issued_push;
305   uint64_t num_issued_pull_req;
306   uint64_t num_issued_pull_rep;
307   uint64_t num_sent_push;
308   uint64_t num_sent_pull_req;
309   uint64_t num_sent_pull_rep;
310   uint64_t num_recv_push;
311   uint64_t num_recv_pull_req;
312   uint64_t num_recv_pull_rep;
313 };
314
315 enum STAT_TYPE
316 {
317   STAT_TYPE_ROUNDS                    =    0x1, /*   1 */
318   STAT_TYPE_BLOCKS                    =    0x2, /*   2 */
319   STAT_TYPE_BLOCKS_MANY_PUSH          =    0x4, /*   3 */
320   STAT_TYPE_BLOCKS_NO_PUSH            =    0x8, /*   4 */
321   STAT_TYPE_BLOCKS_NO_PULL            =   0x10, /*   5 */
322   STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL  =   0x20, /*   6 */
323   STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL    =   0x40, /*   7 */
324   STAT_TYPE_ISSUED_PUSH_SEND          =   0x80, /*   8 */
325   STAT_TYPE_ISSUED_PULL_REQ           =  0x100, /*   9 */
326   STAT_TYPE_ISSUED_PULL_REP           =  0x200, /*  10 */
327   STAT_TYPE_SENT_PUSH_SEND            =  0x400, /*  11 */
328   STAT_TYPE_SENT_PULL_REQ             =  0x800, /*  12 */
329   STAT_TYPE_SENT_PULL_REP             = 0x1000, /*  13 */
330   STAT_TYPE_RECV_PUSH_SEND            = 0x2000, /*  14 */
331   STAT_TYPE_RECV_PULL_REQ             = 0x4000, /*  15 */
332   STAT_TYPE_RECV_PULL_REP             = 0x8000, /*  16 */
333   STAT_TYPE_MAX          = 0x80000000, /*  32 */
334 };
335
336 struct STATcls
337 {
338   struct RPSPeer *rps_peer;
339   enum STAT_TYPE stat_type;
340 };
341
342
343 /**
344  * Information for all the peers.
345  */
346 static struct RPSPeer *rps_peers;
347
348 /**
349  * Peermap to get the index of a given peer ID quick.
350  */
351 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
352
353 /**
354  * IDs of the peers.
355  */
356 static struct GNUNET_PeerIdentity *rps_peer_ids;
357
358 /**
359  * ID of the targeted peer.
360  */
361 static struct GNUNET_PeerIdentity *target_peer;
362
363 /**
364  * ID of the peer that requests for the evaluation.
365  */
366 static struct RPSPeer *eval_peer;
367
368 /**
369  * Number of online peers.
370  */
371 static unsigned int num_peers_online;
372
373 /**
374  * @brief The added sizes of the peer's views
375  */
376 static unsigned int view_sizes;
377
378 /**
379  * Return value from 'main'.
380  */
381 static int ok;
382
383 /**
384  * Identifier for the churn task that runs periodically
385  */
386 static struct GNUNET_SCHEDULER_Task *post_test_task;
387
388 /**
389  * Identifier for the churn task that runs periodically
390  */
391 static struct GNUNET_SCHEDULER_Task *shutdown_task;
392
393 /**
394  * Identifier for the churn task that runs periodically
395  */
396 static struct GNUNET_SCHEDULER_Task *churn_task;
397
398 /**
399  * Called to initialise the given RPSPeer
400  */
401 typedef void (*InitPeer) (struct RPSPeer *rps_peer);
402
403 /**
404  * @brief Called directly after connecting to the service
405  *
406  * @param rps_peer Specific peer the function is called on
407  * @param h the handle to the rps service
408  */
409 typedef void (*PreTest) (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h);
410
411 /**
412  * @brief Executes functions to test the api/service for a given peer
413  *
414  * Called from within #rps_connect_complete_cb ()
415  * Implemented by #churn_test_cb, #profiler_cb, #mal_cb, #single_req_cb,
416  * #delay_req_cb, #seed_big_cb, #single_peer_seed_cb, #seed_cb, #req_cancel_cb
417  *
418  * @param rps_peer the peer the task runs on
419  */
420 typedef void (*MainTest) (struct RPSPeer *rps_peer);
421
422 /**
423  * Callback called once the requested random peers are available
424  */
425 typedef void (*ReplyHandle) (void *cls,
426                              uint64_t n,
427                              const struct GNUNET_PeerIdentity *recv_peers);
428
429 /**
430  * Called directly before disconnecting from the service
431  */
432 typedef void (*PostTest) (struct RPSPeer *peer);
433
434 /**
435  * Function called after disconnect to evaluate test success
436  */
437 typedef int (*EvaluationCallback) (void);
438
439 /**
440  * @brief Do we have Churn?
441  */
442 enum OPTION_CHURN
443 {
444   /**
445    * @brief If we have churn this is set
446    */
447   HAVE_CHURN,
448   /**
449    * @brief If we have no churn this is set
450    */
451   HAVE_NO_CHURN,
452 };
453
454 /**
455  * @brief Is it ok to quit the test before the timeout?
456  */
457 enum OPTION_QUICK_QUIT
458 {
459   /**
460    * @brief It is ok for the test to quit before the timeout triggers
461    */
462   HAVE_QUICK_QUIT,
463
464   /**
465    * @brief It is NOT ok for the test to quit before the timeout triggers
466    */
467   HAVE_NO_QUICK_QUIT,
468 };
469
470 /**
471  * @brief Do we collect statistics at the end?
472  */
473 enum OPTION_COLLECT_STATISTICS
474 {
475   /**
476    * @brief We collect statistics at the end
477    */
478   COLLECT_STATISTICS,
479
480   /**
481    * @brief We do not collect statistics at the end
482    */
483   NO_COLLECT_STATISTICS,
484 };
485
486 /**
487  * @brief Do we collect views during run?
488  */
489 enum OPTION_COLLECT_VIEW
490 {
491   /**
492    * @brief We collect view during run
493    */
494   COLLECT_VIEW,
495
496   /**
497    * @brief We do not collect the view during run
498    */
499   NO_COLLECT_VIEW,
500 };
501
502 /**
503  * Structure to define a single test
504  */
505 struct SingleTestRun
506 {
507   /**
508    * Name of the test
509    */
510   char *name;
511
512   /**
513    * Called with a single peer in order to initialise that peer
514    */
515   InitPeer init_peer;
516
517   /**
518    * Called directly after connecting to the service
519    */
520   PreTest pre_test;
521
522   /**
523    * Main function for each peer
524    */
525   MainTest main_test;
526
527   /**
528    * Callback called once the requested peers are available
529    */
530   ReplyHandle reply_handle;
531
532   /**
533    * Called directly before disconnecting from the service
534    */
535   PostTest post_test;
536
537   /**
538    * Function to evaluate the test results
539    */
540   EvaluationCallback eval_cb;
541
542   /**
543    * Request interval
544    */
545   uint32_t request_interval;
546
547   /**
548    * Number of Requests to make.
549    */
550   uint32_t num_requests;
551
552   /**
553    * Run with (-out) churn
554    */
555   enum OPTION_CHURN have_churn;
556
557   /**
558    * Quit test before timeout?
559    */
560   enum OPTION_QUICK_QUIT have_quick_quit;
561
562   /**
563    * Collect statistics at the end?
564    */
565   enum OPTION_COLLECT_STATISTICS have_collect_statistics;
566
567   /**
568    * Collect view during run?
569    */
570   enum OPTION_COLLECT_VIEW have_collect_view;
571
572   /**
573    * @brief Mark which values from the statistics service to collect at the end
574    * of the run
575    */
576   uint32_t stat_collect_flags;
577 } cur_test_run;
578
579 /**
580  * Did we finish the test?
581  */
582 static int post_test;
583
584 /**
585  * Are we shutting down?
586  */
587 static int in_shutdown;
588
589 /**
590  * Append arguments to file
591  */
592 static void
593 tofile_ (const char *file_name, const char *line)
594 {
595   struct GNUNET_DISK_FileHandle *f;
596   /* char output_buffer[512]; */
597   size_t size;
598   /* int size; */
599   size_t size2;
600
601   if (NULL == (f = GNUNET_DISK_file_open (file_name,
602                                           GNUNET_DISK_OPEN_APPEND
603                                           | GNUNET_DISK_OPEN_WRITE
604                                           | GNUNET_DISK_OPEN_CREATE,
605                                           GNUNET_DISK_PERM_USER_READ
606                                           | GNUNET_DISK_PERM_USER_WRITE
607                                           | GNUNET_DISK_PERM_GROUP_READ
608                                           | GNUNET_DISK_PERM_OTHER_READ)))
609   {
610     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
611                 "Not able to open file %s\n",
612                 file_name);
613     return;
614   }
615   /* size = GNUNET_snprintf (output_buffer,
616                           sizeof (output_buffer),
617                           "%llu %s\n",
618                           GNUNET_TIME_absolute_get ().abs_value_us,
619                           line);
620      if (0 > size)
621      {
622      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
623                 "Failed to write string to buffer (size: %i)\n",
624                 size);
625      return;
626      } */size = strlen (line) * sizeof(char);
627
628   size2 = GNUNET_DISK_file_write (f, line, size);
629   if (size != size2)
630   {
631     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
632                 "Unable to write to file! (Size: %lu, size2: %lu)\n",
633                 size,
634                 size2);
635     if (GNUNET_YES != GNUNET_DISK_file_close (f))
636     {
637       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
638                   "Unable to close file\n");
639     }
640     return;
641   }
642
643   if (GNUNET_YES != GNUNET_DISK_file_close (f))
644   {
645     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
646                 "Unable to close file\n");
647   }
648 }
649
650
651 /**
652  * This function is used to facilitate writing important information to disk
653  */
654 #define tofile(file_name, ...) do { \
655     char tmp_buf[512]; \
656     int size; \
657     size = GNUNET_snprintf (tmp_buf, sizeof(tmp_buf), __VA_ARGS__); \
658     if (0 > size) \
659       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, \
660                   "Failed to create tmp_buf\n"); \
661     else \
662       tofile_ (file_name, tmp_buf); \
663 } while (0);
664
665
666 /**
667  * Write the ids and their according index in the given array to a file
668  * Unused
669  */
670 /* static void
671    ids_to_file (char *file_name,
672              struct GNUNET_PeerIdentity *peer_ids,
673              unsigned int num_peer_ids)
674    {
675    unsigned int i;
676
677    for (i=0 ; i < num_peer_ids ; i++)
678    {
679     to_file (file_name,
680              "%u\t%s",
681              i,
682              GNUNET_i2s_full (&peer_ids[i]));
683    }
684    } */
685
686 /**
687  * Task run on timeout to collect statistics and potentially shut down.
688  */
689 static void
690 post_test_op (void *cls);
691
692
693 /**
694  * Test the success of a single test
695  */
696 static int
697 evaluate (void)
698 {
699   unsigned int i;
700   int tmp_ok;
701
702   tmp_ok = 1;
703
704   for (i = 0; i < num_peers; i++)
705   {
706     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707                 "%u. peer [%s] received %u of %u expected peer_ids: %i\n",
708                 i,
709                 GNUNET_i2s (rps_peers[i].peer_id),
710                 rps_peers[i].num_recv_ids,
711                 rps_peers[i].num_ids_to_request,
712                 (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids));
713     tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids);
714   }
715   return tmp_ok ? 0 : 1;
716 }
717
718
719 /**
720  * Creates an oplist entry and adds it to the oplist DLL
721  */
722 static struct OpListEntry *
723 make_oplist_entry ()
724 {
725   struct OpListEntry *entry;
726
727   entry = GNUNET_new (struct OpListEntry);
728   GNUNET_CONTAINER_DLL_insert_tail (oplist_head, oplist_tail, entry);
729   return entry;
730 }
731
732
733 /**
734  * @brief Checks if given peer already received its statistics value from the
735  * statistics service.
736  *
737  * @param rps_peer the peer to check for
738  *
739  * @return #GNUNET_YES if so
740  *         #GNUNET_NO otherwise
741  */
742 static int
743 check_statistics_collect_completed_single_peer (
744   const struct RPSPeer *rps_peer)
745 {
746   if (cur_test_run.stat_collect_flags !=
747       (cur_test_run.stat_collect_flags
748        & rps_peer->stat_collected_flags))
749   {
750     return GNUNET_NO;
751   }
752   return GNUNET_YES;
753 }
754
755
756 /**
757  * @brief Checks if all peers already received their statistics value from the
758  * statistics service.
759  *
760  * @return #GNUNET_YES if so
761  *         #GNUNET_NO otherwise
762  */
763 static int
764 check_statistics_collect_completed ()
765 {
766   uint32_t i;
767
768   for (i = 0; i < num_peers; i++)
769   {
770     if (GNUNET_NO == check_statistics_collect_completed_single_peer (
771           &rps_peers[i]))
772     {
773       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774                   "At least Peer %" PRIu32
775                   " did not yet receive all statistics values\n",
776                   i);
777       return GNUNET_NO;
778     }
779   }
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781               "All peers received their statistics values\n");
782   return GNUNET_YES;
783 }
784
785
786 /**
787  * Task run on timeout to shut everything down.
788  */
789 static void
790 shutdown_op (void *cls)
791 {
792   unsigned int i;
793
794   (void) cls;
795
796   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
797               "Shutdown task scheduled, going down.\n");
798   in_shutdown = GNUNET_YES;
799   if (NULL != post_test_task)
800   {
801     GNUNET_SCHEDULER_cancel (post_test_task);
802     post_test_op (NULL);
803   }
804   if (NULL != churn_task)
805   {
806     GNUNET_SCHEDULER_cancel (churn_task);
807     churn_task = NULL;
808   }
809   for (i = 0; i < num_peers; i++)
810   {
811     if (NULL != rps_peers[i].rps_handle)
812     {
813       GNUNET_RPS_disconnect (rps_peers[i].rps_handle);
814     }
815     if (NULL != rps_peers[i].op)
816     {
817       GNUNET_TESTBED_operation_done (rps_peers[i].op);
818     }
819   }
820 }
821
822
823 /**
824  * Task run on timeout to collect statistics and potentially shut down.
825  */
826 static void
827 post_test_op (void *cls)
828 {
829   unsigned int i;
830
831   (void) cls;
832
833   post_test_task = NULL;
834   post_test = GNUNET_YES;
835   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
836               "Post test task scheduled, going down.\n");
837   if (NULL != churn_task)
838   {
839     GNUNET_SCHEDULER_cancel (churn_task);
840     churn_task = NULL;
841   }
842   for (i = 0; i < num_peers; i++)
843   {
844     if (NULL != cur_test_run.post_test)
845     {
846       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n",
847                   i);
848       cur_test_run.post_test (&rps_peers[i]);
849     }
850     if (NULL != rps_peers[i].op)
851     {
852       GNUNET_TESTBED_operation_done (rps_peers[i].op);
853       rps_peers[i].op = NULL;
854     }
855   }
856   /* If we do not collect statistics, shut down directly */
857   if ((NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics) ||
858       (GNUNET_YES == check_statistics_collect_completed ()) )
859   {
860     GNUNET_SCHEDULER_shutdown ();
861   }
862 }
863
864
865 /**
866  * Seed peers.
867  */
868 static void
869 seed_peers (void *cls)
870 {
871   struct RPSPeer *peer = cls;
872   unsigned int amount;
873   unsigned int i;
874
875   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
876   {
877     return;
878   }
879
880   GNUNET_assert (NULL != peer->rps_handle);
881
882   // TODO if malicious don't seed mal peers
883   amount = round (.5 * num_peers);
884
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding peers:\n");
886   for (i = 0; i < amount; i++)
887     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
888                 i,
889                 GNUNET_i2s (&rps_peer_ids[i]));
890
891   GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids);
892 }
893
894
895 /**
896  * Seed peers.
897  */
898 static void
899 seed_peers_big (void *cls)
900 {
901   struct RPSPeer *peer = cls;
902   unsigned int seed_msg_size;
903   uint32_t num_peers_max;
904   unsigned int amount;
905   unsigned int i;
906
907   seed_msg_size = 8; /* sizeof (struct GNUNET_RPS_CS_SeedMessage) */
908   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - seed_msg_size)
909                   / sizeof(struct GNUNET_PeerIdentity);
910   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911               "Peers that fit in one seed msg; %u\n",
912               num_peers_max);
913   amount = num_peers_max + (0.5 * num_peers_max);
914   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
915               "Seeding many (%u) peers:\n",
916               amount);
917   struct GNUNET_PeerIdentity ids_to_seed[amount];
918   for (i = 0; i < amount; i++)
919   {
920     ids_to_seed[i] = *peer->peer_id;
921     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
922                 i,
923                 GNUNET_i2s (&ids_to_seed[i]));
924   }
925
926   GNUNET_RPS_seed_ids (peer->rps_handle, amount, ids_to_seed);
927 }
928
929
930 /**
931  * Get the id of peer i.
932  */
933 void
934 info_cb (void *cb_cls,
935          struct GNUNET_TESTBED_Operation *op,
936          const struct GNUNET_TESTBED_PeerInformation *pinfo,
937          const char *emsg)
938 {
939   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
940
941   (void) op;
942
943   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
944   {
945     return;
946   }
947
948   if ((NULL == pinfo) || (NULL != emsg))
949   {
950     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
951     GNUNET_TESTBED_operation_done (entry->op);
952     return;
953   }
954
955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
956               "Peer %u is %s\n",
957               entry->index,
958               GNUNET_i2s (pinfo->result.id));
959
960   rps_peer_ids[entry->index] = *(pinfo->result.id);
961   rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
962
963   GNUNET_assert (GNUNET_OK ==
964                  GNUNET_CONTAINER_multipeermap_put (peer_map,
965                                                     &rps_peer_ids[entry->index],
966                                                     &rps_peers[entry->index],
967                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
968   tofile ("/tmp/rps/peer_ids",
969           "%u\t%s\n",
970           entry->index,
971           GNUNET_i2s_full (&rps_peer_ids[entry->index]));
972
973   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
974   GNUNET_TESTBED_operation_done (entry->op);
975   GNUNET_free (entry);
976 }
977
978
979 /**
980  * Callback to be called when RPS service connect operation is completed
981  *
982  * @param cls the callback closure from functions generating an operation
983  * @param op the operation that has been finished
984  * @param ca_result the RPS service handle returned from rps_connect_adapter
985  * @param emsg error message in case the operation has failed; will be NULL if
986  *          operation has executed successfully.
987  */
988 static void
989 rps_connect_complete_cb (void *cls,
990                          struct GNUNET_TESTBED_Operation *op,
991                          void *ca_result,
992                          const char *emsg)
993 {
994   struct RPSPeer *rps_peer = cls;
995   struct GNUNET_RPS_Handle *rps = ca_result;
996
997   GNUNET_assert (NULL != ca_result);
998
999   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1000   {
1001     return;
1002   }
1003
1004   rps_peer->rps_handle = rps;
1005   rps_peer->online = GNUNET_YES;
1006   num_peers_online++;
1007
1008   GNUNET_assert (op == rps_peer->op);
1009   if (NULL != emsg)
1010   {
1011     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1012                 "Failed to connect to RPS service: %s\n",
1013                 emsg);
1014     ok = 1;
1015     GNUNET_SCHEDULER_shutdown ();
1016     return;
1017   }
1018
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Started client successfully\n");
1020
1021   cur_test_run.main_test (rps_peer);
1022 }
1023
1024
1025 /**
1026  * Adapter function called to establish a connection to
1027  * the RPS service.
1028  *
1029  * @param cls closure
1030  * @param cfg configuration of the peer to connect to; will be available until
1031  *          GNUNET_TESTBED_operation_done() is called on the operation returned
1032  *          from GNUNET_TESTBED_service_connect()
1033  * @return service handle to return in 'op_result', NULL on error
1034  */
1035 static void *
1036 rps_connect_adapter (void *cls,
1037                      const struct GNUNET_CONFIGURATION_Handle *cfg)
1038 {
1039   struct GNUNET_RPS_Handle *h;
1040
1041   h = GNUNET_RPS_connect (cfg);
1042   GNUNET_assert (NULL != h);
1043
1044   if (NULL != cur_test_run.pre_test)
1045     cur_test_run.pre_test (cls, h);
1046   GNUNET_assert (NULL != h);
1047
1048   return h;
1049 }
1050
1051
1052 /**
1053  * Called to open a connection to the peer's statistics
1054  *
1055  * @param cls peer context
1056  * @param cfg configuration of the peer to connect to; will be available until
1057  *          GNUNET_TESTBED_operation_done() is called on the operation returned
1058  *          from GNUNET_TESTBED_service_connect()
1059  * @return service handle to return in 'op_result', NULL on error
1060  */
1061 static void *
1062 stat_connect_adapter (void *cls,
1063                       const struct GNUNET_CONFIGURATION_Handle *cfg)
1064 {
1065   struct RPSPeer *peer = cls;
1066
1067   peer->stats_h = GNUNET_STATISTICS_create ("rps-profiler", cfg);
1068   return peer->stats_h;
1069 }
1070
1071
1072 /**
1073  * Called to disconnect from peer's statistics service
1074  *
1075  * @param cls peer context
1076  * @param op_result service handle returned from the connect adapter
1077  */
1078 static void
1079 stat_disconnect_adapter (void *cls, void *op_result)
1080 {
1081   struct RPSPeer *peer = cls;
1082
1083   // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
1084   //              (peer->stats_h, "core", "# peers connected",
1085   //               stat_iterator, peer));
1086   // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
1087   //              (peer->stats_h, "nse", "# peers connected",
1088   //               stat_iterator, peer));
1089   GNUNET_STATISTICS_destroy (op_result, GNUNET_NO);
1090   peer->stats_h = NULL;
1091 }
1092
1093
1094 /**
1095  * Called after successfully opening a connection to a peer's statistics
1096  * service; we register statistics monitoring for CORE and NSE here.
1097  *
1098  * @param cls the callback closure from functions generating an operation
1099  * @param op the operation that has been finished
1100  * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
1101  * @param emsg error message in case the operation has failed; will be NULL if
1102  *          operation has executed successfully.
1103  */
1104 static void
1105 stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
1106                   void *ca_result, const char *emsg)
1107 {
1108   // struct GNUNET_STATISTICS_Handle *sh = ca_result;
1109   // struct RPSPeer *peer = (struct RPSPeer *) cls;
1110   (void) cls;
1111   (void) op;
1112   (void) ca_result;
1113
1114   if (NULL != emsg)
1115   {
1116     GNUNET_break (0);
1117     return;
1118   }
1119   // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
1120   //              (sh, "core", "# peers connected",
1121   //               stat_iterator, peer));
1122   // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
1123   //              (sh, "nse", "# peers connected",
1124   //               stat_iterator, peer));
1125 }
1126
1127
1128 /**
1129  * Adapter function called to destroy connection to
1130  * RPS service.
1131  *
1132  * @param cls closure
1133  * @param op_result service handle returned from the connect adapter
1134  */
1135 static void
1136 rps_disconnect_adapter (void *cls,
1137                         void *op_result)
1138 {
1139   struct RPSPeer *peer = cls;
1140   struct GNUNET_RPS_Handle *h = op_result;
1141
1142   if (NULL != peer->rps_srh)
1143   {
1144     GNUNET_RPS_stream_cancel (peer->rps_srh);
1145     peer->rps_srh = NULL;
1146   }
1147   GNUNET_assert (NULL != peer);
1148   GNUNET_RPS_disconnect (h);
1149   peer->rps_handle = NULL;
1150 }
1151
1152
1153 /***********************************************************************
1154 * Definition of tests
1155 ***********************************************************************/
1156
1157 // TODO check whether tests can be stopped earlier
1158 static int
1159 default_eval_cb (void)
1160 {
1161   return evaluate ();
1162 }
1163
1164
1165 static int
1166 no_eval (void)
1167 {
1168   return 0;
1169 }
1170
1171
1172 /**
1173  * Initialise given RPSPeer
1174  */
1175 static void
1176 default_init_peer (struct RPSPeer *rps_peer)
1177 {
1178   rps_peer->num_ids_to_request = 1;
1179 }
1180
1181
1182 /**
1183  * Callback to call on receipt of a reply
1184  *
1185  * @param cls closure
1186  * @param n number of peers
1187  * @param recv_peers the received peers
1188  */
1189 static void
1190 default_reply_handle (void *cls,
1191                       uint64_t n,
1192                       const struct GNUNET_PeerIdentity *recv_peers)
1193 {
1194   struct RPSPeer *rps_peer;
1195   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1196   unsigned int i;
1197
1198   rps_peer = pending_rep->rps_peer;
1199   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1200                                rps_peer->pending_rep_tail,
1201                                pending_rep);
1202   rps_peer->num_pending_reps--;
1203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204               "[%s] got %" PRIu64 " peers:\n",
1205               GNUNET_i2s (rps_peer->peer_id),
1206               n);
1207
1208   for (i = 0; i < n; i++)
1209   {
1210     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211                 "%u: %s\n",
1212                 i,
1213                 GNUNET_i2s (&recv_peers[i]));
1214
1215     rps_peer->num_recv_ids++;
1216   }
1217
1218   if ((0 == evaluate ()) && (HAVE_QUICK_QUIT == cur_test_run.have_quick_quit))
1219   {
1220     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test succeeded before timeout\n");
1221     GNUNET_assert (NULL != post_test_task);
1222     GNUNET_SCHEDULER_cancel (post_test_task);
1223     post_test_task = GNUNET_SCHEDULER_add_now (&post_test_op, NULL);
1224     GNUNET_assert (NULL != post_test_task);
1225   }
1226 }
1227
1228
1229 /**
1230  * Request random peers.
1231  */
1232 static void
1233 request_peers (void *cls)
1234 {
1235   struct PendingRequest *pending_req = cls;
1236   struct RPSPeer *rps_peer;
1237   struct PendingReply *pending_rep;
1238
1239   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1240     return;
1241   rps_peer = pending_req->rps_peer;
1242   GNUNET_assert (1 <= rps_peer->num_pending_reqs);
1243   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
1244                                rps_peer->pending_req_tail,
1245                                pending_req);
1246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247               "Requesting one peer\n");
1248   pending_rep = GNUNET_new (struct PendingReply);
1249   pending_rep->rps_peer = rps_peer;
1250   pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
1251                                                       1,
1252                                                       cur_test_run.reply_handle,
1253                                                       pending_rep);
1254   GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
1255                                     rps_peer->pending_rep_tail,
1256                                     pending_rep);
1257   rps_peer->num_pending_reps++;
1258   rps_peer->num_pending_reqs--;
1259 }
1260
1261
1262 static void
1263 cancel_pending_req (struct PendingRequest *pending_req)
1264 {
1265   struct RPSPeer *rps_peer;
1266
1267   rps_peer = pending_req->rps_peer;
1268   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
1269                                rps_peer->pending_req_tail,
1270                                pending_req);
1271   rps_peer->num_pending_reqs--;
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273               "Cancelling pending request\n");
1274   GNUNET_SCHEDULER_cancel (pending_req->request_task);
1275   GNUNET_free (pending_req);
1276 }
1277
1278
1279 static void
1280 cancel_request (struct PendingReply *pending_rep)
1281 {
1282   struct RPSPeer *rps_peer;
1283
1284   rps_peer = pending_rep->rps_peer;
1285   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1286                                rps_peer->pending_rep_tail,
1287                                pending_rep);
1288   rps_peer->num_pending_reps--;
1289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1290               "Cancelling request\n");
1291   GNUNET_RPS_request_cancel (pending_rep->req_handle);
1292   GNUNET_free (pending_rep);
1293 }
1294
1295
1296 /**
1297  * Cancel a request.
1298  */
1299 static void
1300 cancel_request_cb (void *cls)
1301 {
1302   struct RPSPeer *rps_peer = cls;
1303   struct PendingReply *pending_rep;
1304
1305   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1306     return;
1307   pending_rep = rps_peer->pending_rep_head;
1308   GNUNET_assert (1 <= rps_peer->num_pending_reps);
1309   cancel_request (pending_rep);
1310 }
1311
1312
1313 /**
1314  * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
1315  * issued, nor replied
1316  */
1317 void
1318 schedule_missing_requests (struct RPSPeer *rps_peer)
1319 {
1320   unsigned int i;
1321   struct PendingRequest *pending_req;
1322
1323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324               "Scheduling %u - %u missing requests\n",
1325               rps_peer->num_ids_to_request,
1326               rps_peer->num_pending_reqs + rps_peer->num_pending_reps);
1327   GNUNET_assert (rps_peer->num_pending_reqs + rps_peer->num_pending_reps <=
1328                  rps_peer->num_ids_to_request);
1329   for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
1330        i < rps_peer->num_ids_to_request; i++)
1331   {
1332     pending_req = GNUNET_new (struct PendingRequest);
1333     pending_req->rps_peer = rps_peer;
1334     pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
1335       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1336                                      cur_test_run.request_interval * i),
1337       request_peers,
1338       pending_req);
1339     GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
1340                                       rps_peer->pending_req_tail,
1341                                       pending_req);
1342     rps_peer->num_pending_reqs++;
1343   }
1344 }
1345
1346
1347 void
1348 cancel_pending_req_rep (struct RPSPeer *rps_peer)
1349 {
1350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1351               "Cancelling all (pending) requests.\n");
1352   while (NULL != rps_peer->pending_req_head)
1353     cancel_pending_req (rps_peer->pending_req_head);
1354   GNUNET_assert (0 == rps_peer->num_pending_reqs);
1355   while (NULL != rps_peer->pending_rep_head)
1356     cancel_request (rps_peer->pending_rep_head);
1357   GNUNET_assert (0 == rps_peer->num_pending_reps);
1358 }
1359
1360
1361 /***********************************
1362 * MALICIOUS
1363 ***********************************/
1364
1365 /**
1366  * Initialise only non-mal RPSPeers
1367  */
1368 static void
1369 mal_init_peer (struct RPSPeer *rps_peer)
1370 {
1371   if (rps_peer->index >= round (portion * num_peers))
1372     rps_peer->num_ids_to_request = 1;
1373 }
1374
1375
1376 /**
1377  * @brief Set peers to (non-)malicious before execution
1378  *
1379  * Of signature #PreTest
1380  *
1381  * @param rps_peer the peer to set (non-) malicious
1382  * @param h the handle to the service
1383  */
1384 static void
1385 mal_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1386 {
1387   #if ENABLE_MALICIOUS
1388   uint32_t num_mal_peers;
1389
1390   GNUNET_assert ((1 >= portion) &&
1391                  (0 < portion));
1392   num_mal_peers = round (portion * num_peers);
1393
1394   if (rps_peer->index < num_mal_peers)
1395   {
1396     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1397                 "%u. peer [%s] of %" PRIu32
1398                 " malicious peers turning malicious\n",
1399                 rps_peer->index,
1400                 GNUNET_i2s (rps_peer->peer_id),
1401                 num_mal_peers);
1402
1403     GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
1404                               rps_peer_ids, target_peer);
1405   }
1406   #endif /* ENABLE_MALICIOUS */
1407 }
1408
1409
1410 static void
1411 mal_cb (struct RPSPeer *rps_peer)
1412 {
1413   uint32_t num_mal_peers;
1414
1415   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1416   {
1417     return;
1418   }
1419
1420   #if ENABLE_MALICIOUS
1421   GNUNET_assert ((1 >= portion) &&
1422                  (0 < portion));
1423   num_mal_peers = round (portion * num_peers);
1424
1425   if (rps_peer->index >= num_mal_peers)
1426   {   /* It's useless to ask a malicious peer about a random sample -
1427          it's not sampling */
1428     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
1429                                     GNUNET_TIME_UNIT_SECONDS, 2),
1430                                   seed_peers, rps_peer);
1431     schedule_missing_requests (rps_peer);
1432   }
1433   #endif /* ENABLE_MALICIOUS */
1434 }
1435
1436
1437 /***********************************
1438 * SINGLE_REQUEST
1439 ***********************************/
1440 static void
1441 single_req_cb (struct RPSPeer *rps_peer)
1442 {
1443   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1444   {
1445     return;
1446   }
1447
1448   schedule_missing_requests (rps_peer);
1449 }
1450
1451
1452 /***********************************
1453 * DELAYED_REQUESTS
1454 ***********************************/
1455 static void
1456 delay_req_cb (struct RPSPeer *rps_peer)
1457 {
1458   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1459   {
1460     return;
1461   }
1462
1463   schedule_missing_requests (rps_peer);
1464 }
1465
1466
1467 /***********************************
1468 * SEED
1469 ***********************************/
1470 static void
1471 seed_cb (struct RPSPeer *rps_peer)
1472 {
1473   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1474   {
1475     return;
1476   }
1477
1478   GNUNET_SCHEDULER_add_delayed (
1479     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
1480     seed_peers, rps_peer);
1481 }
1482
1483
1484 /***********************************
1485 * SEED_BIG
1486 ***********************************/
1487 static void
1488 seed_big_cb (struct RPSPeer *rps_peer)
1489 {
1490   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1491   {
1492     return;
1493   }
1494
1495   // TODO test seeding > GNUNET_MAX_MESSAGE_SIZE peers
1496   GNUNET_SCHEDULER_add_delayed (
1497     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1498     seed_peers_big, rps_peer);
1499 }
1500
1501
1502 /***********************************
1503 * SINGLE_PEER_SEED
1504 ***********************************/
1505 static void
1506 single_peer_seed_cb (struct RPSPeer *rps_peer)
1507 {
1508   (void) rps_peer;
1509   // TODO
1510 }
1511
1512
1513 /***********************************
1514 * SEED_REQUEST
1515 ***********************************/
1516 static void
1517 seed_req_cb (struct RPSPeer *rps_peer)
1518 {
1519   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1520   {
1521     return;
1522   }
1523
1524   GNUNET_SCHEDULER_add_delayed (
1525     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1526     seed_peers, rps_peer);
1527   schedule_missing_requests (rps_peer);
1528 }
1529
1530
1531 // TODO start big mal
1532
1533 /***********************************
1534 * REQUEST_CANCEL
1535 ***********************************/
1536 static void
1537 req_cancel_cb (struct RPSPeer *rps_peer)
1538 {
1539   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1540   {
1541     return;
1542   }
1543
1544   schedule_missing_requests (rps_peer);
1545   GNUNET_SCHEDULER_add_delayed (
1546     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1547                                    (cur_test_run.request_interval + 1)),
1548     cancel_request_cb, rps_peer);
1549 }
1550
1551
1552 /***********************************
1553 * CHURN
1554 ***********************************/
1555
1556 static void
1557 churn (void *cls);
1558
1559 /**
1560  * @brief Starts churn
1561  *
1562  * Has signature of #MainTest
1563  *
1564  * This is not implemented too nicely as this is called for each peer, but we
1565  * only need to call it once. (Yes we check that we only schedule the task
1566  * once.)
1567  *
1568  * @param rps_peer The peer it's called for
1569  */
1570 static void
1571 churn_test_cb (struct RPSPeer *rps_peer)
1572 {
1573   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1574   {
1575     return;
1576   }
1577
1578   /* Start churn */
1579   if ((HAVE_CHURN == cur_test_run.have_churn) && (NULL == churn_task))
1580   {
1581     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1582                 "Starting churn task\n");
1583     churn_task = GNUNET_SCHEDULER_add_delayed (
1584       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1585       churn,
1586       NULL);
1587   }
1588   else
1589   {
1590     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1591                 "Not starting churn task\n");
1592   }
1593
1594   schedule_missing_requests (rps_peer);
1595 }
1596
1597
1598 /***********************************
1599 * SUB
1600 ***********************************/
1601
1602 static void
1603 got_stream_peer_cb (void *cls,
1604                     uint64_t num_peers,
1605                     const struct GNUNET_PeerIdentity *peers)
1606 {
1607   const struct RPSPeer *rps_peer = cls;
1608
1609   for (uint64_t i = 0; i < num_peers; i++)
1610   {
1611     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1612                 "Peer %" PRIu32 " received [%s] from stream.\n",
1613                 rps_peer->index,
1614                 GNUNET_i2s (&peers[i]));
1615     if ((0 != rps_peer->index) &&
1616         (0 == memcmp (&peers[i],
1617                       &rps_peers[0].peer_id,
1618                       sizeof(struct GNUNET_PeerIdentity))))
1619     {
1620       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1621                   "Received a peer id outside sub\n");
1622       ok = 1;
1623     }
1624     else if ((0 == rps_peer->index) &&
1625              (0 != memcmp (&peers[i],
1626                            &rps_peers[0].peer_id,
1627                            sizeof(struct GNUNET_PeerIdentity))))
1628     {
1629       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1630                   "Received a peer id outside sub (lonely)\n");
1631       ok = 1;
1632     }
1633   }
1634 }
1635
1636
1637 static void
1638 sub_post (struct RPSPeer *rps_peer)
1639 {
1640   if (0 != rps_peer->index)
1641     GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
1642   else
1643     GNUNET_RPS_sub_stop (rps_peer->rps_handle, "lonely");
1644 }
1645
1646
1647 static void
1648 sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1649 {
1650   (void) rps_peer;
1651
1652   if (0 != rps_peer->index)
1653     GNUNET_RPS_sub_start (h, "test");
1654   else
1655     GNUNET_RPS_sub_start (h, "lonely");    /* have a group of one */
1656   rps_peer->rps_srh = GNUNET_RPS_stream_request (h,
1657                                                  &got_stream_peer_cb,
1658                                                  rps_peer);
1659 }
1660
1661
1662 /***********************************
1663 * PROFILER
1664 ***********************************/
1665
1666 /**
1667  * Callback to be called when RPS service is started or stopped at peers
1668  *
1669  * @param cls NULL
1670  * @param op the operation handle
1671  * @param emsg NULL on success; otherwise an error description
1672  */
1673 static void
1674 churn_cb (void *cls,
1675           struct GNUNET_TESTBED_Operation *op,
1676           const char *emsg)
1677 {
1678   (void) op;
1679   // FIXME
1680   struct OpListEntry *entry = cls;
1681
1682   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1683   {
1684     return;
1685   }
1686
1687   GNUNET_TESTBED_operation_done (entry->op);
1688   if (NULL != emsg)
1689   {
1690     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1691                 "Failed to start/stop RPS at a peer\n");
1692     GNUNET_SCHEDULER_shutdown ();
1693     return;
1694   }
1695   GNUNET_assert (0 != entry->delta);
1696
1697   num_peers_online += entry->delta;
1698
1699   if (PEER_GO_OFFLINE == entry->delta)
1700   {   /* Peer hopefully just went offline */
1701     if (GNUNET_YES != rps_peers[entry->index].online)
1702     {
1703       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1704                   "peer %s was expected to go offline but is still marked as online\n",
1705                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1706       GNUNET_break (0);
1707     }
1708     else
1709     {
1710       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1711                   "peer %s probably went offline as expected\n",
1712                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1713     }
1714     rps_peers[entry->index].online = GNUNET_NO;
1715   }
1716
1717   else if (PEER_GO_ONLINE < entry->delta)
1718   {   /* Peer hopefully just went online */
1719     if (GNUNET_NO != rps_peers[entry->index].online)
1720     {
1721       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1722                   "peer %s was expected to go online but is still marked as offline\n",
1723                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1724       GNUNET_break (0);
1725     }
1726     else
1727     {
1728       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1729                   "peer %s probably went online as expected\n",
1730                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1731       if (NULL != cur_test_run.pre_test)
1732       {
1733         cur_test_run.pre_test (&rps_peers[entry->index],
1734                                rps_peers[entry->index].rps_handle);
1735         schedule_missing_requests (&rps_peers[entry->index]);
1736       }
1737     }
1738     rps_peers[entry->index].online = GNUNET_YES;
1739   }
1740   else
1741   {
1742     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1743                 "Invalid value for delta: %i\n", entry->delta);
1744     GNUNET_break (0);
1745   }
1746
1747   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
1748   rps_peers[entry->index].entry_op_manage = NULL;
1749   GNUNET_free (entry);
1750   // if (num_peers_in_round[current_round] == peers_running)
1751   //  run_round ();
1752 }
1753
1754
1755 /**
1756  * @brief Set the rps-service up or down for a specific peer
1757  *
1758  * @param i index of action
1759  * @param j index of peer
1760  * @param delta (#PEER_ONLINE_DELTA) down (-1) or up (1)
1761  * @param prob_go_on_off the probability of the action
1762  */
1763 static void
1764 manage_service_wrapper (unsigned int i, unsigned int j,
1765                         enum PEER_ONLINE_DELTA delta,
1766                         double prob_go_on_off)
1767 {
1768   struct OpListEntry *entry = NULL;
1769   uint32_t prob;
1770
1771   /* make sure that management operation is not already scheduled */
1772   if (NULL != rps_peers[j].entry_op_manage)
1773   {
1774     return;
1775   }
1776
1777   prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1778                                    UINT32_MAX);
1779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1780               "%u. selected peer (%u: %s) is %s.\n",
1781               i,
1782               j,
1783               GNUNET_i2s (rps_peers[j].peer_id),
1784               (PEER_GO_ONLINE == delta) ? "online" : "offline");
1785   if (prob < prob_go_on_off * UINT32_MAX)
1786   {
1787     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788                 "%s goes %s\n",
1789                 GNUNET_i2s (rps_peers[j].peer_id),
1790                 (PEER_GO_OFFLINE == delta) ? "offline" : "online");
1791
1792     if (PEER_GO_OFFLINE == delta)
1793       cancel_pending_req_rep (&rps_peers[j]);
1794     entry = make_oplist_entry ();
1795     entry->delta = delta;
1796     entry->index = j;
1797     entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
1798                                                     testbed_peers[j],
1799                                                     "rps",
1800                                                     &churn_cb,
1801                                                     entry,
1802                                                     (PEER_GO_OFFLINE == delta) ?
1803                                                     0 : 1);
1804     rps_peers[j].entry_op_manage = entry;
1805   }
1806 }
1807
1808
1809 static void
1810 churn (void *cls)
1811 {
1812   (void) cls;
1813   unsigned int i;
1814   unsigned int j;
1815   double portion_online;
1816   unsigned int *permut;
1817   double prob_go_offline;
1818   double portion_go_online;
1819   double portion_go_offline;
1820
1821   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1822   {
1823     return;
1824   }
1825   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1826               "Churn function executing\n");
1827
1828   churn_task = NULL; /* Should be invalid by now */
1829
1830   /* Compute the probability for an online peer to go offline
1831    * this round */
1832   portion_online = num_peers_online * 1.0 / num_peers;
1833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834               "Portion online: %f\n",
1835               portion_online);
1836   portion_go_online = ((1 - portion_online) * .5 * .66);
1837   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1838               "Portion that should go online: %f\n",
1839               portion_go_online);
1840   portion_go_offline = (portion_online + portion_go_online) - .75;
1841   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1842               "Portion that probably goes offline: %f\n",
1843               portion_go_offline);
1844   prob_go_offline = portion_go_offline / (portion_online * .5);
1845   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1846               "Probability of a selected online peer to go offline: %f\n",
1847               prob_go_offline);
1848
1849   permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK,
1850                                          (unsigned int) num_peers);
1851
1852   /* Go over 50% randomly chosen peers */
1853   for (i = 0; i < .5 * num_peers; i++)
1854   {
1855     j = permut[i];
1856
1857     /* If online, shut down with certain probability */
1858     if (GNUNET_YES == rps_peers[j].online)
1859     {
1860       manage_service_wrapper (i, j, -1, prob_go_offline);
1861     }
1862
1863     /* If offline, restart with certain probability */
1864     else if (GNUNET_NO == rps_peers[j].online)
1865     {
1866       manage_service_wrapper (i, j, 1, 0.66);
1867     }
1868   }
1869
1870   GNUNET_free (permut);
1871
1872   churn_task = GNUNET_SCHEDULER_add_delayed (
1873     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1874     churn,
1875     NULL);
1876 }
1877
1878
1879 /**
1880  * Initialise given RPSPeer
1881  */
1882 static void
1883 profiler_init_peer (struct RPSPeer *rps_peer)
1884 {
1885   if (num_peers - 1 == rps_peer->index)
1886     rps_peer->num_ids_to_request = cur_test_run.num_requests;
1887 }
1888
1889
1890 /**
1891  * Callback to call on receipt of a reply
1892  *
1893  * @param cls closure
1894  * @param n number of peers
1895  * @param recv_peers the received peers
1896  */
1897 static void
1898 profiler_reply_handle (void *cls,
1899                        uint64_t n,
1900                        const struct GNUNET_PeerIdentity *recv_peers)
1901 {
1902   struct RPSPeer *rps_peer;
1903   struct RPSPeer *rcv_rps_peer;
1904   char *file_name;
1905   char *file_name_dh;
1906   unsigned int i;
1907   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1908
1909   rps_peer = pending_rep->rps_peer;
1910   file_name = "/tmp/rps/received_ids";
1911   file_name_dh = "/tmp/rps/diehard_input";
1912   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1913               "[%s] got %" PRIu64 " peers:\n",
1914               GNUNET_i2s (rps_peer->peer_id),
1915               n);
1916   for (i = 0; i < n; i++)
1917   {
1918     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919                 "%u: %s\n",
1920                 i,
1921                 GNUNET_i2s (&recv_peers[i]));
1922     tofile (file_name,
1923             "%s\n",
1924             GNUNET_i2s_full (&recv_peers[i]));
1925     rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
1926     GNUNET_assert (NULL != rcv_rps_peer);
1927     tofile (file_name_dh,
1928             "%" PRIu32 "\n",
1929             (uint32_t) rcv_rps_peer->index);
1930   }
1931   default_reply_handle (cls, n, recv_peers);
1932 }
1933
1934
1935 static void
1936 profiler_cb (struct RPSPeer *rps_peer)
1937 {
1938   if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
1939   {
1940     return;
1941   }
1942
1943   /* Start churn */
1944   if ((HAVE_CHURN == cur_test_run.have_churn) && (NULL == churn_task))
1945   {
1946     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1947                 "Starting churn task\n");
1948     churn_task = GNUNET_SCHEDULER_add_delayed (
1949       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1950       churn,
1951       NULL);
1952   }
1953   else
1954   {
1955     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956                 "Not starting churn task\n");
1957   }
1958
1959   /* Only request peer ids at one peer.
1960    * (It's the before-last because last one is target of the focussed attack.)
1961    */
1962   if (eval_peer == rps_peer)
1963     schedule_missing_requests (rps_peer);
1964 }
1965
1966
1967 /**
1968  * Function called from #profiler_eval with a filename.
1969  *
1970  * @param cls closure
1971  * @param filename complete filename (absolute path)
1972  * @return #GNUNET_OK to continue to iterate,
1973  *  #GNUNET_NO to stop iteration with no error,
1974  *  #GNUNET_SYSERR to abort iteration with error!
1975  */
1976 int
1977 file_name_cb (void *cls, const char *filename)
1978 {
1979   (void) cls;
1980
1981   if (NULL != strstr (filename, "sampler_el"))
1982   {
1983     struct RPS_SamplerElement *s_elem;
1984     struct GNUNET_CRYPTO_AuthKey auth_key;
1985     const char *key_char;
1986     uint32_t i;
1987
1988     key_char = filename + 20;   /* Length of "/tmp/rps/sampler_el-" */
1989     tofile (filename, "--------------------------\n");
1990
1991     auth_key = string_to_auth_key (key_char);
1992     s_elem = RPS_sampler_elem_create ();
1993     RPS_sampler_elem_set (s_elem, auth_key);
1994
1995     for (i = 0; i < num_peers; i++)
1996     {
1997       RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
1998     }
1999     RPS_sampler_elem_destroy (s_elem);
2000   }
2001   return GNUNET_OK;
2002 }
2003
2004
2005 /**
2006  * This is run after the test finished.
2007  *
2008  * Compute all perfect samples.
2009  */
2010 int
2011 profiler_eval (void)
2012 {
2013   /* Compute perfect sample for each sampler element */
2014   if (-1 == GNUNET_DISK_directory_scan ("/tmp/rps/", file_name_cb, NULL))
2015   {
2016     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n");
2017   }
2018
2019   return evaluate ();
2020 }
2021
2022
2023 /**
2024  * @brief is b in view of a?
2025  *
2026  * @param a
2027  * @param b
2028  *
2029  * @return
2030  */
2031 static int
2032 is_in_view (uint32_t a, uint32_t b)
2033 {
2034   uint32_t i;
2035
2036   for (i = 0; i < rps_peers[a].cur_view_count; i++)
2037   {
2038     if (0 == memcmp (rps_peers[b].peer_id,
2039                      &rps_peers[a].cur_view[i],
2040                      sizeof(struct GNUNET_PeerIdentity)))
2041     {
2042       return GNUNET_YES;
2043     }
2044   }
2045   return GNUNET_NO;
2046 }
2047
2048
2049 static uint32_t
2050 get_idx_of_pid (const struct GNUNET_PeerIdentity *pid)
2051 {
2052   uint32_t i;
2053
2054   for (i = 0; i < num_peers; i++)
2055   {
2056     if (0 == memcmp (pid,
2057                      rps_peers[i].peer_id,
2058                      sizeof(struct GNUNET_PeerIdentity)))
2059     {
2060       return i;
2061     }
2062   }
2063   // return 0; /* Should not happen - make compiler happy */
2064   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2065               "No known _PeerIdentity %s!\n",
2066               GNUNET_i2s_full (pid));
2067   GNUNET_assert (0);
2068 }
2069
2070
2071 /**
2072  * @brief Counts number of peers in view of a that have b in their view
2073  *
2074  * @param a
2075  * @param uint32_tb
2076  *
2077  * @return
2078  */
2079 static uint32_t
2080 count_containing_views (uint32_t a, uint32_t b)
2081 {
2082   uint32_t i;
2083   uint32_t peer_idx;
2084   uint32_t count = 0;
2085
2086   for (i = 0; i < rps_peers[a].cur_view_count; i++)
2087   {
2088     peer_idx = get_idx_of_pid (&rps_peers[a].cur_view[i]);
2089     if (GNUNET_YES == is_in_view (peer_idx, b))
2090     {
2091       count++;
2092     }
2093   }
2094   return count;
2095 }
2096
2097
2098 /**
2099  * @brief Computes the probability for each other peer to be selected by the
2100  * sampling process based on the views of all peers
2101  *
2102  * @param peer_idx index of the peer that is about to sample
2103  */
2104 static void
2105 compute_probabilities (uint32_t peer_idx)
2106 {
2107   // double probs[num_peers] = { 0 };
2108   double probs[num_peers];
2109   size_t probs_as_str_size = (num_peers * 10 + 1) * sizeof(char);
2110   char *probs_as_str = GNUNET_malloc (probs_as_str_size);
2111   char *probs_as_str_cpy;
2112   uint32_t i;
2113   double prob_push;
2114   double prob_pull;
2115   uint32_t view_size;
2116   uint32_t cont_views;
2117   uint32_t number_of_being_in_pull_events;
2118   int tmp;
2119   uint32_t count_non_zero_prob = 0;
2120
2121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2122               "Computing probabilities for peer %" PRIu32 "\n", peer_idx);
2123   /* Firstly without knowledge of old views */
2124   for (i = 0; i < num_peers; i++)
2125   {
2126     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2127                 "\tfor peer %" PRIu32 ":\n", i);
2128     view_size = rps_peers[i].cur_view_count;
2129     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2130                 "\t\tview_size: %" PRIu32 "\n", view_size);
2131     /* For peer i the probability of being sampled is
2132      * evenly distributed among all possibly observed peers. */
2133     /* We could have observed a peer in three cases:
2134      *   1. peer sent a push
2135      *   2. peer was contained in a pull reply
2136      *   3. peer was in history (sampler) - ignored for now */
2137     /* 1. Probability of having received a push from peer i */
2138     if ((GNUNET_YES == is_in_view (i, peer_idx)) &&
2139         (1 <= (0.45 * view_size)))
2140     {
2141       prob_push = 1.0 * binom (0.45 * view_size, 1)
2142                   /
2143                   binom (view_size, 0.45 * view_size);
2144       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2145                   "\t\t%" PRIu32 " is in %" PRIu32 "'s view, prob: %f\n",
2146                   peer_idx,
2147                   i,
2148                   prob_push);
2149       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2150                   "\t\tposs choices from view: %" PRIu32 ", containing i: %"
2151                   PRIu32 "\n",
2152                   binom (view_size, 0.45 * view_size),
2153                   binom (0.45 * view_size, 1));
2154     }
2155     else
2156     {
2157       prob_push = 0;
2158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2159                   "\t\t%" PRIu32 " is not in %" PRIu32 "'s view, prob: 0\n",
2160                   peer_idx,
2161                   i);
2162     }
2163     /* 2. Probability of peer i being contained in pulls */
2164     view_size = rps_peers[peer_idx].cur_view_count;
2165     cont_views = count_containing_views (peer_idx, i);
2166     number_of_being_in_pull_events =
2167       (binom (view_size, 0.45 * view_size)
2168        - binom (view_size - cont_views, 0.45 * view_size));
2169     if (0 != number_of_being_in_pull_events)
2170     {
2171       prob_pull = number_of_being_in_pull_events
2172                   /
2173                   (1.0 * binom (view_size, 0.45 * view_size));
2174     }
2175     else
2176     {
2177       prob_pull = 0;
2178     }
2179     probs[i] = prob_push + prob_pull - (prob_push * prob_pull);
2180     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2181                 "\t\t%" PRIu32 " has %" PRIu32 " of %" PRIu32
2182                 " peers in its view who know %" PRIu32 " prob: %f\n",
2183                 peer_idx,
2184                 cont_views,
2185                 view_size,
2186                 i,
2187                 prob_pull);
2188     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2189                 "\t\tnumber of possible pull combinations: %" PRIu32 "\n",
2190                 binom (view_size, 0.45 * view_size));
2191     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2192                 "\t\tnumber of possible pull combinations without %" PRIu32
2193                 ": %" PRIu32 "\n",
2194                 i,
2195                 binom (view_size - cont_views, 0.45 * view_size));
2196     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2197                 "\t\tnumber of possible pull combinations with %" PRIu32
2198                 ": %" PRIu32 "\n",
2199                 i,
2200                 number_of_being_in_pull_events);
2201
2202     if (0 != probs[i])
2203       count_non_zero_prob++;
2204   }
2205   /* normalize */
2206   if (0 != count_non_zero_prob)
2207   {
2208     for (i = 0; i < num_peers; i++)
2209     {
2210       probs[i] = probs[i] * (1.0 / count_non_zero_prob);
2211     }
2212   }
2213   else
2214   {
2215     for (i = 0; i < num_peers; i++)
2216     {
2217       probs[i] = 0;
2218     }
2219   }
2220   /* str repr */
2221   for (i = 0; i < num_peers; i++)
2222   {
2223     probs_as_str_cpy = GNUNET_strndup (probs_as_str, probs_as_str_size);
2224     tmp = GNUNET_snprintf (probs_as_str,
2225                            probs_as_str_size,
2226                            "%s %7.6f", probs_as_str_cpy, probs[i]);
2227     GNUNET_free (probs_as_str_cpy);
2228     GNUNET_assert (0 <= tmp);
2229   }
2230
2231   to_file_w_len (rps_peers[peer_idx].file_name_probs,
2232                  probs_as_str_size,
2233                  probs_as_str);
2234   GNUNET_free (probs_as_str);
2235 }
2236
2237
2238 /**
2239  * @brief This counts the number of peers in which views a given peer occurs.
2240  *
2241  * It also stores this value in the rps peer.
2242  *
2243  * @param peer_idx the index of the peer to count the representation
2244  *
2245  * @return the number of occurrences
2246  */
2247 static uint32_t
2248 count_peer_in_views_2 (uint32_t peer_idx)
2249 {
2250   uint32_t i, j;
2251   uint32_t count = 0;
2252
2253   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
2254   {
2255     for (j = 0; j < rps_peers[i].cur_view_count; j++)   /* entry in view */
2256     {
2257       if (0 == memcmp (rps_peers[peer_idx].peer_id,
2258                        &rps_peers[i].cur_view[j],
2259                        sizeof(struct GNUNET_PeerIdentity)))
2260       {
2261         count++;
2262         break;
2263       }
2264     }
2265   }
2266   rps_peers[peer_idx].count_in_views = count;
2267   return count;
2268 }
2269
2270
2271 static uint32_t
2272 cumulated_view_sizes ()
2273 {
2274   uint32_t i;
2275
2276   view_sizes = 0;
2277   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
2278   {
2279     view_sizes += rps_peers[i].cur_view_count;
2280   }
2281   return view_sizes;
2282 }
2283
2284
2285 static void
2286 count_peer_in_views (uint32_t *count_peers)
2287 {
2288   uint32_t i, j;
2289
2290   for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
2291   {
2292     for (j = 0; j < rps_peers[i].cur_view_count; j++)   /* entry in view */
2293     {
2294       if (0 == memcmp (rps_peers[i].peer_id,
2295                        &rps_peers[i].cur_view[j],
2296                        sizeof(struct GNUNET_PeerIdentity)))
2297       {
2298         count_peers[i]++;
2299       }
2300     }
2301   }
2302 }
2303
2304
2305 void
2306 compute_diversity ()
2307 {
2308   uint32_t i;
2309   /* ith entry represents the numer of occurrences in other peer's views */
2310   uint32_t *count_peers = GNUNET_new_array (num_peers, uint32_t);
2311   uint32_t views_total_size;
2312   double expected;
2313   /* deviation from expected number of peers */
2314   double *deviation = GNUNET_new_array (num_peers, double);
2315
2316   views_total_size = 0;
2317   expected = 0;
2318
2319   /* For each peer count its representation in other peer's views*/
2320   for (i = 0; i < num_peers; i++) /* Peer to count */
2321   {
2322     views_total_size += rps_peers[i].cur_view_count;
2323     count_peer_in_views (count_peers);
2324     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325                 "Counted representation of %" PRIu32 "th peer [%s]: %" PRIu32
2326                 "\n",
2327                 i,
2328                 GNUNET_i2s (rps_peers[i].peer_id),
2329                 count_peers[i]);
2330   }
2331
2332   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2333               "size of all views combined: %" PRIu32 "\n",
2334               views_total_size);
2335   expected = ((double) 1 / num_peers) * views_total_size;
2336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2337               "Expected number of occurrences of each peer in all views: %f\n",
2338               expected);
2339   for (i = 0; i < num_peers; i++) /* Peer to count */
2340   {
2341     deviation[i] = expected - count_peers[i];
2342     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2343                 "Deviation from expectation: %f\n", deviation[i]);
2344   }
2345   GNUNET_free (count_peers);
2346   GNUNET_free (deviation);
2347 }
2348
2349
2350 void
2351 print_view_sizes ()
2352 {
2353   uint32_t i;
2354
2355   for (i = 0; i < num_peers; i++) /* Peer to count */
2356   {
2357     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2358                 "View size of %" PRIu32 ". [%s] is %" PRIu32 "\n",
2359                 i,
2360                 GNUNET_i2s (rps_peers[i].peer_id),
2361                 rps_peers[i].cur_view_count);
2362   }
2363 }
2364
2365
2366 void
2367 all_views_updated_cb ()
2368 {
2369   compute_diversity ();
2370   print_view_sizes ();
2371 }
2372
2373
2374 void
2375 view_update_cb (void *cls,
2376                 uint64_t view_size,
2377                 const struct GNUNET_PeerIdentity *peers)
2378 {
2379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2380               "View was updated (%" PRIu64 ")\n", view_size);
2381   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
2382   to_file ("/tmp/rps/view_sizes.txt",
2383            "%" PRIu64 " %" PRIu32 "",
2384            rps_peer->index,
2385            view_size);
2386   for (int i = 0; i < view_size; i++)
2387   {
2388     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2389                 "\t%s\n", GNUNET_i2s (&peers[i]));
2390   }
2391   GNUNET_array_grow (rps_peer->cur_view,
2392                      rps_peer->cur_view_count,
2393                      view_size);
2394   // *rps_peer->cur_view = *peers;
2395   GNUNET_memcpy (rps_peer->cur_view,
2396                  peers,
2397                  view_size * sizeof(struct GNUNET_PeerIdentity));
2398   to_file ("/tmp/rps/count_in_views.txt",
2399            "%" PRIu64 " %" PRIu32 "",
2400            rps_peer->index,
2401            count_peer_in_views_2 (rps_peer->index));
2402   cumulated_view_sizes ();
2403   if (0 != view_size)
2404   {
2405     to_file ("/tmp/rps/repr.txt",
2406              "%" PRIu64  /* index */
2407              " %" PRIu32  /* occurrence in views */
2408              " %" PRIu32  /* view sizes */
2409              " %f"  /* fraction of repr in views */
2410              " %f"  /* average view size */
2411              " %f"  /* prob of occurrence in view slot */
2412              " %f" "",  /* exp frac of repr in views */
2413              rps_peer->index,
2414              count_peer_in_views_2 (rps_peer->index),
2415              view_sizes,
2416              count_peer_in_views_2 (rps_peer->index) / (view_size * 1.0), /* fraction of representation in views */
2417              view_sizes / (view_size * 1.0),  /* average view size */
2418              1.0 / view_size,  /* prob of occurrence in view slot */
2419              (1.0 / view_size) * (view_sizes / view_size)  /* expected fraction of repr in views */
2420              );
2421   }
2422   compute_probabilities (rps_peer->index);
2423   all_views_updated_cb ();
2424 }
2425
2426
2427 static void
2428 pre_profiler (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
2429 {
2430   rps_peer->file_name_probs =
2431     store_prefix_file_name (rps_peer->peer_id, "probs");
2432   GNUNET_RPS_view_request (h, 0, view_update_cb, rps_peer);
2433 }
2434
2435
2436 void
2437 write_final_stats (void)
2438 {
2439   uint32_t i;
2440
2441   for (i = 0; i < num_peers; i++)
2442   {
2443     to_file ("/tmp/rps/final_stats.dat",
2444              "%" PRIu32 " "  /* index */
2445              "%s %"  /* id */
2446              PRIu64 " %"  /* rounds */
2447              PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64
2448              " %"                                                                     /* blocking */
2449              PRIu64 " %" PRIu64 " %" PRIu64 " %"  /* issued */
2450              PRIu64 " %" PRIu64 " %" PRIu64 " %"  /* sent */
2451              PRIu64 " %" PRIu64 " %" PRIu64 /* recv */,
2452              i,
2453              GNUNET_i2s (rps_peers[i].peer_id),
2454              rps_peers[i].num_rounds,
2455              rps_peers[i].num_blocks,
2456              rps_peers[i].num_blocks_many_push,
2457              rps_peers[i].num_blocks_no_push,
2458              rps_peers[i].num_blocks_no_pull,
2459              rps_peers[i].num_blocks_many_push_no_pull,
2460              rps_peers[i].num_blocks_no_push_no_pull,
2461              rps_peers[i].num_issued_push,
2462              rps_peers[i].num_issued_pull_req,
2463              rps_peers[i].num_issued_pull_rep,
2464              rps_peers[i].num_sent_push,
2465              rps_peers[i].num_sent_pull_req,
2466              rps_peers[i].num_sent_pull_rep,
2467              rps_peers[i].num_recv_push,
2468              rps_peers[i].num_recv_pull_req,
2469              rps_peers[i].num_recv_pull_rep);
2470   }
2471 }
2472
2473
2474 /**
2475  * Continuation called by #GNUNET_STATISTICS_get() functions.
2476  *
2477  * Remembers that this specific statistics value was received for this peer.
2478  * Checks whether all peers received their statistics yet.
2479  * Issues the shutdown.
2480  *
2481  * @param cls closure
2482  * @param success #GNUNET_OK if statistics were
2483  *        successfully obtained, #GNUNET_SYSERR if not.
2484  */
2485 void
2486 post_test_shutdown_ready_cb (void *cls,
2487                              int success)
2488 {
2489   struct STATcls *stat_cls = (struct STATcls *) cls;
2490   struct RPSPeer *rps_peer = stat_cls->rps_peer;
2491
2492   if (GNUNET_OK == success)
2493   {
2494     /* set flag that we we got the value */
2495     rps_peer->stat_collected_flags |= stat_cls->stat_type;
2496   }
2497   else
2498   {
2499     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2500                 "Peer %u did not receive statistics value\n",
2501                 rps_peer->index);
2502     GNUNET_free (stat_cls);
2503     GNUNET_break (0);
2504   }
2505
2506   if ((NULL != rps_peer->stat_op) &&
2507       (GNUNET_YES == check_statistics_collect_completed_single_peer (
2508          rps_peer)) )
2509   {
2510     GNUNET_TESTBED_operation_done (rps_peer->stat_op);
2511   }
2512
2513   write_final_stats ();
2514   if (GNUNET_YES == check_statistics_collect_completed ())
2515   {
2516     // write_final_stats ();
2517     GNUNET_free (stat_cls);
2518     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2519                 "Shutting down\n");
2520     GNUNET_SCHEDULER_shutdown ();
2521   }
2522   else
2523   {
2524     GNUNET_free (stat_cls);
2525   }
2526 }
2527
2528
2529 /**
2530  * @brief Converts string representation to the corresponding #STAT_TYPE enum.
2531  *
2532  * @param stat_str string representation of statistics specifier
2533  *
2534  * @return corresponding enum
2535  */
2536 enum STAT_TYPE
2537 stat_str_2_type (const char *stat_str)
2538 {
2539   if (0 == strncmp ("# rounds blocked - no pull replies", stat_str, strlen (
2540                       "# rounds blocked - no pull replies")))
2541   {
2542     return STAT_TYPE_BLOCKS_NO_PULL;
2543   }
2544   else if (0 == strncmp ("# rounds blocked - too many pushes, no pull replies",
2545                          stat_str, strlen (
2546                            "# rounds blocked - too many pushes, no pull replies")))
2547   {
2548     return STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL;
2549   }
2550   else if (0 == strncmp ("# rounds blocked - too many pushes", stat_str,
2551                          strlen ("# rounds blocked - too many pushes")))
2552   {
2553     return STAT_TYPE_BLOCKS_MANY_PUSH;
2554   }
2555   else if (0 == strncmp ("# rounds blocked - no pushes, no pull replies",
2556                          stat_str, strlen (
2557                            "# rounds blocked - no pushes, no pull replies")))
2558   {
2559     return STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL;
2560   }
2561   else if (0 == strncmp ("# rounds blocked - no pushes", stat_str, strlen (
2562                            "# rounds blocked - no pushes")))
2563   {
2564     return STAT_TYPE_BLOCKS_NO_PUSH;
2565   }
2566   else if (0 == strncmp ("# rounds blocked", stat_str, strlen (
2567                            "# rounds blocked")))
2568   {
2569     return STAT_TYPE_BLOCKS;
2570   }
2571   else if (0 == strncmp ("# rounds", stat_str, strlen ("# rounds")))
2572   {
2573     return STAT_TYPE_ROUNDS;
2574   }
2575   else if (0 == strncmp ("# push send issued", stat_str, strlen (
2576                            "# push send issued")))
2577   {
2578     return STAT_TYPE_ISSUED_PUSH_SEND;
2579   }
2580   else if (0 == strncmp ("# pull request send issued", stat_str, strlen (
2581                            "# pull request send issued")))
2582   {
2583     return STAT_TYPE_ISSUED_PULL_REQ;
2584   }
2585   else if (0 == strncmp ("# pull reply send issued", stat_str, strlen (
2586                            "# pull reply send issued")))
2587   {
2588     return STAT_TYPE_ISSUED_PULL_REP;
2589   }
2590   else if (0 == strncmp ("# pushes sent", stat_str, strlen ("# pushes sent")))
2591   {
2592     return STAT_TYPE_SENT_PUSH_SEND;
2593   }
2594   else if (0 == strncmp ("# pull requests sent", stat_str, strlen (
2595                            "# pull requests sent")))
2596   {
2597     return STAT_TYPE_SENT_PULL_REQ;
2598   }
2599   else if (0 == strncmp ("# pull replys sent", stat_str, strlen (
2600                            "# pull replys sent")))
2601   {
2602     return STAT_TYPE_SENT_PULL_REP;
2603   }
2604   else if (0 == strncmp ("# push message received", stat_str, strlen (
2605                            "# push message received")))
2606   {
2607     return STAT_TYPE_RECV_PUSH_SEND;
2608   }
2609   else if (0 == strncmp ("# pull request message received", stat_str, strlen (
2610                            "# pull request message received")))
2611   {
2612     return STAT_TYPE_RECV_PULL_REQ;
2613   }
2614   else if (0 == strncmp ("# pull reply messages received", stat_str, strlen (
2615                            "# pull reply messages received")))
2616   {
2617     return STAT_TYPE_RECV_PULL_REP;
2618   }
2619   return STAT_TYPE_MAX;
2620 }
2621
2622
2623 /**
2624  * @brief Converts #STAT_TYPE enum to the equivalent string representation that
2625  * is stored with the statistics service.
2626  *
2627  * @param stat_type #STAT_TYPE enum
2628  *
2629  * @return string representation that matches statistics value
2630  */
2631 char*
2632 stat_type_2_str (enum STAT_TYPE stat_type)
2633 {
2634   switch (stat_type)
2635   {
2636   case STAT_TYPE_ROUNDS:
2637     return "# rounds";
2638
2639   case STAT_TYPE_BLOCKS:
2640     return "# rounds blocked";
2641
2642   case STAT_TYPE_BLOCKS_MANY_PUSH:
2643     return "# rounds blocked - too many pushes";
2644
2645   case STAT_TYPE_BLOCKS_NO_PUSH:
2646     return "# rounds blocked - no pushes";
2647
2648   case STAT_TYPE_BLOCKS_NO_PULL:
2649     return "# rounds blocked - no pull replies";
2650
2651   case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
2652     return "# rounds blocked - too many pushes, no pull replies";
2653
2654   case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
2655     return "# rounds blocked - no pushes, no pull replies";
2656
2657   case STAT_TYPE_ISSUED_PUSH_SEND:
2658     return "# push send issued";
2659
2660   case STAT_TYPE_ISSUED_PULL_REQ:
2661     return "# pull request send issued";
2662
2663   case STAT_TYPE_ISSUED_PULL_REP:
2664     return "# pull reply send issued";
2665
2666   case STAT_TYPE_SENT_PUSH_SEND:
2667     return "# pushes sent";
2668
2669   case STAT_TYPE_SENT_PULL_REQ:
2670     return "# pull requests sent";
2671
2672   case STAT_TYPE_SENT_PULL_REP:
2673     return "# pull replys sent";
2674
2675   case STAT_TYPE_RECV_PUSH_SEND:
2676     return "# push message received";
2677
2678   case STAT_TYPE_RECV_PULL_REQ:
2679     return "# pull request message received";
2680
2681   case STAT_TYPE_RECV_PULL_REP:
2682     return "# pull reply messages received";
2683
2684   case STAT_TYPE_MAX:
2685   default:
2686     return "ERROR";
2687     ;
2688   }
2689 }
2690
2691
2692 /**
2693  * Callback function to process statistic values.
2694  *
2695  * @param cls closure
2696  * @param subsystem name of subsystem that created the statistic
2697  * @param name the name of the datum
2698  * @param value the current value
2699  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
2700  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
2701  */
2702 int
2703 stat_iterator (void *cls,
2704                const char *subsystem,
2705                const char *name,
2706                uint64_t value,
2707                int is_persistent)
2708 {
2709   (void) subsystem;
2710   (void) is_persistent;
2711   const struct STATcls *stat_cls = (const struct STATcls *) cls;
2712   struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
2713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
2714               // stat_type_2_str (stat_cls->stat_type),
2715               name,
2716               value);
2717   to_file (rps_peer->file_name_stats,
2718            "%s: %" PRIu64 "\n",
2719            name,
2720            value);
2721   switch (stat_str_2_type (name))
2722   {
2723   case STAT_TYPE_ROUNDS:
2724     rps_peer->num_rounds = value;
2725     break;
2726
2727   case STAT_TYPE_BLOCKS:
2728     rps_peer->num_blocks = value;
2729     break;
2730
2731   case STAT_TYPE_BLOCKS_MANY_PUSH:
2732     rps_peer->num_blocks_many_push = value;
2733     break;
2734
2735   case STAT_TYPE_BLOCKS_NO_PUSH:
2736     rps_peer->num_blocks_no_push = value;
2737     break;
2738
2739   case STAT_TYPE_BLOCKS_NO_PULL:
2740     rps_peer->num_blocks_no_pull = value;
2741     break;
2742
2743   case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
2744     rps_peer->num_blocks_many_push_no_pull = value;
2745     break;
2746
2747   case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
2748     rps_peer->num_blocks_no_push_no_pull = value;
2749     break;
2750
2751   case STAT_TYPE_ISSUED_PUSH_SEND:
2752     rps_peer->num_issued_push = value;
2753     break;
2754
2755   case STAT_TYPE_ISSUED_PULL_REQ:
2756     rps_peer->num_issued_pull_req = value;
2757     break;
2758
2759   case STAT_TYPE_ISSUED_PULL_REP:
2760     rps_peer->num_issued_pull_rep = value;
2761     break;
2762
2763   case STAT_TYPE_SENT_PUSH_SEND:
2764     rps_peer->num_sent_push = value;
2765     break;
2766
2767   case STAT_TYPE_SENT_PULL_REQ:
2768     rps_peer->num_sent_pull_req = value;
2769     break;
2770
2771   case STAT_TYPE_SENT_PULL_REP:
2772     rps_peer->num_sent_pull_rep = value;
2773     break;
2774
2775   case STAT_TYPE_RECV_PUSH_SEND:
2776     rps_peer->num_recv_push = value;
2777     break;
2778
2779   case STAT_TYPE_RECV_PULL_REQ:
2780     rps_peer->num_recv_pull_req = value;
2781     break;
2782
2783   case STAT_TYPE_RECV_PULL_REP:
2784     rps_peer->num_recv_pull_rep = value;
2785     break;
2786
2787   case STAT_TYPE_MAX:
2788   default:
2789     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2790                 "Unknown statistics string: %s\n",
2791                 name);
2792     break;
2793   }
2794   return GNUNET_OK;
2795 }
2796
2797
2798 void
2799 post_profiler (struct RPSPeer *rps_peer)
2800 {
2801   if (COLLECT_STATISTICS != cur_test_run.have_collect_statistics)
2802   {
2803     return;
2804   }
2805
2806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2807               "Going to request statistic values with mask 0x%" PRIx32 "\n",
2808               cur_test_run.stat_collect_flags);
2809
2810   struct STATcls *stat_cls;
2811   uint32_t stat_type;
2812   for (stat_type = STAT_TYPE_ROUNDS;
2813        stat_type < STAT_TYPE_MAX;
2814        stat_type = stat_type << 1)
2815   {
2816     if (stat_type & cur_test_run.stat_collect_flags)
2817     {
2818       stat_cls = GNUNET_malloc (sizeof(struct STATcls));
2819       stat_cls->rps_peer = rps_peer;
2820       stat_cls->stat_type = stat_type;
2821       rps_peer->file_name_stats =
2822         store_prefix_file_name (rps_peer->peer_id, "stats");
2823       GNUNET_STATISTICS_get (rps_peer->stats_h,
2824                              "rps",
2825                              stat_type_2_str (stat_type),
2826                              post_test_shutdown_ready_cb,
2827                              stat_iterator,
2828                              (struct STATcls *) stat_cls);
2829       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2830                   "Requested statistics for %s (peer %" PRIu32 ")\n",
2831                   stat_type_2_str (stat_type),
2832                   rps_peer->index);
2833     }
2834   }
2835 }
2836
2837
2838 /***********************************************************************
2839 * /Definition of tests
2840 ***********************************************************************/
2841
2842
2843 /**
2844  * Actual "main" function for the testcase.
2845  *
2846  * @param cls closure
2847  * @param h the run handle
2848  * @param n_peers number of peers in 'peers'
2849  * @param peers handle to peers run in the testbed
2850  * @param links_succeeded the number of overlay link connection attempts that
2851  *          succeeded
2852  * @param links_failed the number of overlay link connection attempts that
2853  *          failed
2854  */
2855 static void
2856 run (void *cls,
2857      struct GNUNET_TESTBED_RunHandle *h,
2858      unsigned int n_peers,
2859      struct GNUNET_TESTBED_Peer **peers,
2860      unsigned int links_succeeded,
2861      unsigned int links_failed)
2862 {
2863   (void) cls;
2864   (void) h;
2865   (void) links_failed;
2866   unsigned int i;
2867   struct OpListEntry *entry;
2868
2869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "RUN was called\n");
2870
2871   /* Check whether we timed out */
2872   if ((n_peers != num_peers) ||
2873       (NULL == peers) ||
2874       (0 == links_succeeded) )
2875   {
2876     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2877                 "Going down due to args (eg. timeout)\n");
2878     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tn_peers: %u\n", n_peers);
2879     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tnum_peers: %" PRIu32 "\n",
2880                 num_peers);
2881     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tpeers: %p\n", peers);
2882     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tlinks_succeeded: %u\n",
2883                 links_succeeded);
2884     GNUNET_SCHEDULER_shutdown ();
2885     return;
2886   }
2887
2888
2889   /* Initialize peers */
2890   testbed_peers = peers;
2891   num_peers_online = 0;
2892   for (i = 0; i < num_peers; i++)
2893   {
2894     entry = make_oplist_entry ();
2895     entry->index = i;
2896     rps_peers[i].index = i;
2897     if (NULL != cur_test_run.init_peer)
2898       cur_test_run.init_peer (&rps_peers[i]);
2899     if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
2900     {
2901       rps_peers->cur_view_count = 0;
2902       rps_peers->cur_view = NULL;
2903     }
2904     entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
2905                                                      GNUNET_TESTBED_PIT_IDENTITY,
2906                                                      &info_cb,
2907                                                      entry);
2908   }
2909
2910   /* Bring peers up */
2911   GNUNET_assert (num_peers == n_peers);
2912   for (i = 0; i < n_peers; i++)
2913   {
2914     rps_peers[i].index = i;
2915     rps_peers[i].op =
2916       GNUNET_TESTBED_service_connect (&rps_peers[i],
2917                                       peers[i],
2918                                       "rps",
2919                                       &rps_connect_complete_cb,
2920                                       &rps_peers[i],
2921                                       &rps_connect_adapter,
2922                                       &rps_disconnect_adapter,
2923                                       &rps_peers[i]);
2924     /* Connect all peers to statistics service */
2925     if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
2926     {
2927       rps_peers[i].stat_op =
2928         GNUNET_TESTBED_service_connect (NULL,
2929                                         peers[i],
2930                                         "statistics",
2931                                         stat_complete_cb,
2932                                         &rps_peers[i],
2933                                         &stat_connect_adapter,
2934                                         &stat_disconnect_adapter,
2935                                         &rps_peers[i]);
2936     }
2937   }
2938
2939   if (NULL != churn_task)
2940     GNUNET_SCHEDULER_cancel (churn_task);
2941   post_test_task = GNUNET_SCHEDULER_add_delayed (timeout, &post_test_op, NULL);
2942   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
2943                                            (timeout_s * 1.2) + 0.1 * num_peers);
2944   shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
2945 }
2946
2947
2948 /**
2949  * Entry point for the testcase, sets up the testbed.
2950  *
2951  * @param argc unused
2952  * @param argv unused
2953  * @return 0 on success
2954  */
2955 int
2956 main (int argc, char *argv[])
2957 {
2958   int ret_value;
2959
2960   (void) argc;
2961
2962   /* Defaults for tests */
2963   num_peers = 5;
2964   cur_test_run.name = "test-rps-default";
2965   cur_test_run.init_peer = default_init_peer;
2966   cur_test_run.pre_test = NULL;
2967   cur_test_run.reply_handle = default_reply_handle;
2968   cur_test_run.eval_cb = default_eval_cb;
2969   cur_test_run.post_test = NULL;
2970   cur_test_run.have_churn = HAVE_CHURN;
2971   cur_test_run.have_collect_statistics = NO_COLLECT_STATISTICS;
2972   cur_test_run.stat_collect_flags = 0;
2973   cur_test_run.have_collect_view = NO_COLLECT_VIEW;
2974   churn_task = NULL;
2975   timeout_s = 30;
2976
2977   if (strstr (argv[0], "malicious") != NULL)
2978   {
2979     cur_test_run.pre_test = mal_pre;
2980     cur_test_run.main_test = mal_cb;
2981     cur_test_run.init_peer = mal_init_peer;
2982     timeout_s = 40;
2983
2984     if (strstr (argv[0], "_1") != NULL)
2985     {
2986       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 1\n");
2987       cur_test_run.name = "test-rps-malicious_1";
2988       mal_type = 1;
2989     }
2990     else if (strstr (argv[0], "_2") != NULL)
2991     {
2992       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 2\n");
2993       cur_test_run.name = "test-rps-malicious_2";
2994       mal_type = 2;
2995     }
2996     else if (strstr (argv[0], "_3") != NULL)
2997     {
2998       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 3\n");
2999       cur_test_run.name = "test-rps-malicious_3";
3000       mal_type = 3;
3001     }
3002   }
3003
3004   else if (strstr (argv[0], "_single_req") != NULL)
3005   {
3006     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test single request\n");
3007     cur_test_run.name = "test-rps-single-req";
3008     cur_test_run.main_test = single_req_cb;
3009     cur_test_run.have_churn = HAVE_NO_CHURN;
3010   }
3011
3012   else if (strstr (argv[0], "_delayed_reqs") != NULL)
3013   {
3014     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test delayed requests\n");
3015     cur_test_run.name = "test-rps-delayed-reqs";
3016     cur_test_run.main_test = delay_req_cb;
3017     cur_test_run.have_churn = HAVE_NO_CHURN;
3018   }
3019
3020   else if (strstr (argv[0], "_seed_big") != NULL)
3021   {
3022     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3023                 "Test seeding (num_peers > GNUNET_MAX_MESSAGE_SIZE)\n");
3024     num_peers = 1;
3025     cur_test_run.name = "test-rps-seed-big";
3026     cur_test_run.main_test = seed_big_cb;
3027     cur_test_run.eval_cb = no_eval;
3028     cur_test_run.have_churn = HAVE_NO_CHURN;
3029     timeout_s = 10;
3030   }
3031
3032   else if (strstr (argv[0], "_single_peer_seed") != NULL)
3033   {
3034     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3035                 "Test seeding and requesting on a single peer\n");
3036     cur_test_run.name = "test-rps-single-peer-seed";
3037     cur_test_run.main_test = single_peer_seed_cb;
3038     cur_test_run.have_churn = HAVE_NO_CHURN;
3039   }
3040
3041   else if (strstr (argv[0], "_seed_request") != NULL)
3042   {
3043     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3044                 "Test seeding and requesting on multiple peers\n");
3045     cur_test_run.name = "test-rps-seed-request";
3046     cur_test_run.main_test = seed_req_cb;
3047     cur_test_run.have_churn = HAVE_NO_CHURN;
3048   }
3049
3050   else if (strstr (argv[0], "_seed") != NULL)
3051   {
3052     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding\n");
3053     cur_test_run.name = "test-rps-seed";
3054     cur_test_run.main_test = seed_cb;
3055     cur_test_run.eval_cb = no_eval;
3056     cur_test_run.have_churn = HAVE_NO_CHURN;
3057   }
3058
3059   else if (strstr (argv[0], "_req_cancel") != NULL)
3060   {
3061     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
3062     cur_test_run.name = "test-rps-req-cancel";
3063     num_peers = 1;
3064     cur_test_run.main_test = req_cancel_cb;
3065     cur_test_run.eval_cb = no_eval;
3066     cur_test_run.have_churn = HAVE_NO_CHURN;
3067     timeout_s = 10;
3068   }
3069
3070   else if (strstr (argv[0], "_churn") != NULL)
3071   {
3072     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test churn\n");
3073     cur_test_run.name = "test-rps-churn";
3074     num_peers = 5;
3075     cur_test_run.init_peer = default_init_peer;
3076     cur_test_run.main_test = churn_test_cb;
3077     cur_test_run.reply_handle = default_reply_handle;
3078     cur_test_run.eval_cb = default_eval_cb;
3079     cur_test_run.have_churn = HAVE_NO_CHURN;
3080     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
3081     timeout_s = 40;
3082   }
3083
3084   else if (strstr (argv[0], "_sub") != NULL)
3085   {
3086     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test subs\n");
3087     cur_test_run.name = "test-rps-sub";
3088     num_peers = 5;
3089     // cur_test_run.init_peer = &default_init_peer;
3090     cur_test_run.pre_test = &sub_pre;
3091     cur_test_run.main_test = &single_req_cb;
3092     // cur_test_run.reply_handle = default_reply_handle;
3093     cur_test_run.post_test = &sub_post;
3094     // cur_test_run.eval_cb = default_eval_cb;
3095     cur_test_run.have_churn = HAVE_NO_CHURN;
3096     cur_test_run.have_quick_quit = HAVE_QUICK_QUIT;
3097   }
3098
3099   else if (strstr (argv[0], "profiler") != NULL)
3100   {
3101     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
3102     cur_test_run.name = "test-rps-profiler";
3103     num_peers = 16;
3104     mal_type = 3;
3105     cur_test_run.init_peer = profiler_init_peer;
3106     // cur_test_run.pre_test = mal_pre;
3107     cur_test_run.pre_test = pre_profiler;
3108     cur_test_run.main_test = profiler_cb;
3109     cur_test_run.reply_handle = profiler_reply_handle;
3110     cur_test_run.eval_cb = profiler_eval;
3111     cur_test_run.post_test = post_profiler;
3112     cur_test_run.request_interval = 2;
3113     cur_test_run.num_requests = 5;
3114     // cur_test_run.have_churn = HAVE_CHURN;
3115     cur_test_run.have_churn = HAVE_NO_CHURN;
3116     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
3117     cur_test_run.have_collect_statistics = COLLECT_STATISTICS;
3118     cur_test_run.stat_collect_flags = STAT_TYPE_ROUNDS
3119                                       | STAT_TYPE_BLOCKS
3120                                       | STAT_TYPE_BLOCKS_MANY_PUSH
3121                                       | STAT_TYPE_BLOCKS_NO_PUSH
3122                                       | STAT_TYPE_BLOCKS_NO_PULL
3123                                       | STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL
3124                                       | STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL
3125                                       | STAT_TYPE_ISSUED_PUSH_SEND
3126                                       | STAT_TYPE_ISSUED_PULL_REQ
3127                                       | STAT_TYPE_ISSUED_PULL_REP
3128                                       | STAT_TYPE_SENT_PUSH_SEND
3129                                       | STAT_TYPE_SENT_PULL_REQ
3130                                       | STAT_TYPE_SENT_PULL_REP
3131                                       | STAT_TYPE_RECV_PUSH_SEND
3132                                       | STAT_TYPE_RECV_PULL_REQ
3133                                       | STAT_TYPE_RECV_PULL_REP;
3134     cur_test_run.have_collect_view = COLLECT_VIEW;
3135     timeout_s = 150;
3136
3137     /* 'Clean' directory */
3138     (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
3139     GNUNET_DISK_directory_create ("/tmp/rps/");
3140   }
3141   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, timeout_s);
3142
3143   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
3144   peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
3145   rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
3146   if ((2 == mal_type) ||
3147       (3 == mal_type))
3148     target_peer = &rps_peer_ids[num_peers - 2];
3149   if (profiler_eval == cur_test_run.eval_cb)
3150     eval_peer = &rps_peers[num_peers - 1];    /* FIXME: eval_peer could be a
3151                                                  malicious peer if not careful
3152                                                  with the malicious portion */
3153
3154   ok = 1;
3155   ret_value = GNUNET_TESTBED_test_run (cur_test_run.name,
3156                                        "test_rps.conf",
3157                                        num_peers,
3158                                        0, NULL, NULL,
3159                                        &run, NULL);
3160   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3161               "_test_run returned.\n");
3162   if (GNUNET_OK != ret_value)
3163   {
3164     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3165                 "Test did not run successfully!\n");
3166   }
3167
3168   ret_value = cur_test_run.eval_cb ();
3169
3170   if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
3171   {
3172     GNUNET_array_grow (rps_peers->cur_view,
3173                        rps_peers->cur_view_count,
3174                        0);
3175   }
3176   GNUNET_free (rps_peers);
3177   GNUNET_free (rps_peer_ids);
3178   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
3179   return ret_value;
3180 }
3181
3182
3183 /* end of test_rps.c */