c958194a8e271c2c66a14ea736ee9987e77918d6
[oweals/gnunet.git] / src / rps / test_rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2012 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file rps/test_rps.c
22  * @brief Testcase for the random peer sampling service.  Starts
23  *        a peergroup with a given number of peers, then waits to
24  *        receive size pushes/pulls from each peer.  Expects to wait
25  *        for one message from each peer.
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_testbed_service.h"
30
31 #include "gnunet_rps_service.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler_elem.h"
34
35 #include <inttypes.h>
36
37
38 /**
39  * How many peers do we start?
40  */
41 static uint32_t num_peers;
42
43 /**
44  * How many peers are ready to shutdown?
45  */
46 static uint32_t num_shutdown_ready;
47
48 /**
49  * How long do we run the test?
50  */
51 //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
52 static struct GNUNET_TIME_Relative timeout;
53
54
55 /**
56  * Portion of malicious peers
57  */
58 static double portion = .1;
59
60 /**
61  * Type of malicious peer to test
62  */
63 static unsigned int mal_type = 0;
64
65 /**
66  * Handles to all of the running peers
67  */
68 static struct GNUNET_TESTBED_Peer **testbed_peers;
69
70 /**
71  * @brief Indicates whether peer should go off- or online
72  */
73 enum PEER_ONLINE_DELTA {
74   /**
75    * @brief Indicates peer going online
76    */
77   PEER_GO_ONLINE = 1,
78   /**
79    * @brief Indicates peer going offline
80    */
81   PEER_GO_OFFLINE = -1,
82 };
83
84 /**
85  * Operation map entry
86  */
87 struct OpListEntry
88 {
89   /**
90    * DLL next ptr
91    */
92   struct OpListEntry *next;
93
94   /**
95    * DLL prev ptr
96    */
97   struct OpListEntry *prev;
98
99   /**
100    * The testbed operation
101    */
102   struct GNUNET_TESTBED_Operation *op;
103
104   /**
105    * Depending on whether we start or stop RPS service at the peer, set this to
106    * #PEER_GO_ONLINE (1) or #PEER_GO_OFFLINE (-1)
107    */
108   enum PEER_ONLINE_DELTA delta;
109
110   /**
111    * Index of the regarding peer
112    */
113   unsigned int index;
114 };
115
116 /**
117  * OpList DLL head
118  */
119 static struct OpListEntry *oplist_head;
120
121 /**
122  * OpList DLL tail
123  */
124 static struct OpListEntry *oplist_tail;
125
126
127 /**
128  * A pending reply: A request was sent and the reply is pending.
129  */
130 struct PendingReply
131 {
132   /**
133    * DLL next,prev ptr
134    */
135   struct PendingReply *next;
136   struct PendingReply *prev;
137
138   /**
139    * Handle to the request we are waiting for
140    */
141   struct GNUNET_RPS_Request_Handle *req_handle;
142
143   /**
144    * The peer that requested
145    */
146   struct RPSPeer *rps_peer;
147 };
148
149
150 /**
151  * A pending request: A request was not made yet but is scheduled for later.
152  */
153 struct PendingRequest
154 {
155   /**
156    * DLL next,prev ptr
157    */
158   struct PendingRequest *next;
159   struct PendingRequest *prev;
160
161   /**
162    * Handle to the request we are waiting for
163    */
164   struct GNUNET_SCHEDULER_Task *request_task;
165
166   /**
167    * The peer that requested
168    */
169   struct RPSPeer *rps_peer;
170 };
171
172
173 /**
174  * Information we track for each peer.
175  */
176 struct RPSPeer
177 {
178   /**
179    * Index of the peer.
180    */
181   unsigned int index;
182
183   /**
184    * Handle for RPS connect operation.
185    */
186   struct GNUNET_TESTBED_Operation *op;
187
188   /**
189    * Handle to RPS service.
190    */
191   struct GNUNET_RPS_Handle *rps_handle;
192
193   /**
194    * ID of the peer.
195    */
196   struct GNUNET_PeerIdentity *peer_id;
197
198   /**
199    * A request handle to check for an request
200    */
201   //struct GNUNET_RPS_Request_Handle *req_handle;
202
203   /**
204    * Peer on- or offline?
205    */
206   int online;
207
208   /**
209    * Number of Peer IDs to request during the whole test
210    */
211   unsigned int num_ids_to_request;
212
213   /**
214    * Pending requests DLL
215    */
216   struct PendingRequest *pending_req_head;
217   struct PendingRequest *pending_req_tail;
218
219   /**
220    * Number of pending requests
221    */
222   unsigned int num_pending_reqs;
223
224   /**
225    * Pending replies DLL
226    */
227   struct PendingReply *pending_rep_head;
228   struct PendingReply *pending_rep_tail;
229
230   /**
231    * Number of pending replies
232    */
233   unsigned int num_pending_reps;
234
235   /**
236    * Number of received PeerIDs
237    */
238   unsigned int num_recv_ids;
239
240   /**
241    * Pending operation on that peer
242    */
243   const struct OpListEntry *entry_op_manage;
244
245   /**
246    * Testbed operation to connect to statistics service
247    */
248   struct GNUNET_TESTBED_Operation *stat_op;
249
250   /**
251    * Handle to the statistics service
252    */
253   struct GNUNET_STATISTICS_Handle *stats_h;
254 };
255
256
257 /**
258  * Information for all the peers.
259  */
260 static struct RPSPeer *rps_peers;
261
262 /**
263  * Peermap to get the index of a given peer ID quick.
264  */
265 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
266
267 /**
268  * IDs of the peers.
269  */
270 static struct GNUNET_PeerIdentity *rps_peer_ids;
271
272 /**
273  * ID of the targeted peer.
274  */
275 static struct GNUNET_PeerIdentity *target_peer;
276
277 /**
278  * ID of the peer that requests for the evaluation.
279  */
280 static struct RPSPeer *eval_peer;
281
282 /**
283  * Number of online peers.
284  */
285 static unsigned int num_peers_online;
286
287 /**
288  * Return value from 'main'.
289  */
290 static int ok;
291
292 /**
293  * Identifier for the churn task that runs periodically
294  */
295 static struct GNUNET_SCHEDULER_Task *shutdown_task;
296
297 /**
298  * Identifier for the churn task that runs periodically
299  */
300 static struct GNUNET_SCHEDULER_Task *churn_task;
301
302 /**
303  * Called to initialise the given RPSPeer
304  */
305 typedef void (*InitPeer) (struct RPSPeer *rps_peer);
306
307 /**
308  * @brief Called directly after connecting to the service
309  *
310  * @param rps_peer Specific peer the function is called on
311  * @param h the handle to the rps service
312  */
313 typedef void (*PreTest) (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h);
314
315 /**
316  * @brief Executes functions to test the api/service for a given peer
317  *
318  * Called from within #rps_connect_complete_cb ()
319  * Implemented by #churn_test_cb, #profiler_cb, #mal_cb, #single_req_cb,
320  * #delay_req_cb, #seed_big_cb, #single_peer_seed_cb, #seed_cb, #req_cancel_cb
321  *
322  * @param rps_peer the peer the task runs on
323  */
324 typedef void (*MainTest) (struct RPSPeer *rps_peer);
325
326 /**
327  * Callback called once the requested random peers are available
328  */
329 typedef void (*ReplyHandle) (void *cls,
330                              uint64_t n,
331                              const struct GNUNET_PeerIdentity *recv_peers);
332
333 /**
334  * Called directly before disconnecting from the service
335  */
336 typedef void (*PostTest) (const struct RPSPeer *peer);
337
338 /**
339  * Function called after disconnect to evaluate test success
340  */
341 typedef int (*EvaluationCallback) (void);
342
343 /**
344  * @brief Do we have Churn?
345  */
346 enum OPTION_CHURN {
347   /**
348    * @brief If we have churn this is set
349    */
350   HAVE_CHURN,
351   /**
352    * @brief If we have no churn this is set
353    */
354   HAVE_NO_CHURN,
355 };
356
357 /**
358  * @brief Is it ok to quit the test before the timeout?
359  */
360 enum OPTION_QUICK_QUIT {
361   /**
362    * @brief It is ok for the test to quit before the timeout triggers
363    */
364   HAVE_QUICK_QUIT,
365
366   /**
367    * @brief It is NOT ok for the test to quit before the timeout triggers
368    */
369   HAVE_NO_QUICK_QUIT,
370 };
371
372 /**
373  * @brief Do we collect statistics at the end?
374  */
375 enum OPTION_COLLECT_STATISTICS {
376   /**
377    * @brief We collect statistics at the end
378    */
379   COLLECT_STATISTICS,
380
381   /**
382    * @brief We do not collect statistics at the end
383    */
384   NO_COLLECT_STATISTICS,
385 };
386
387 /**
388  * Structure to define a single test
389  */
390 struct SingleTestRun
391 {
392   /**
393    * Name of the test
394    */
395   char *name;
396
397   /**
398    * Called with a single peer in order to initialise that peer
399    */
400   InitPeer init_peer;
401
402   /**
403    * Called directly after connecting to the service
404    */
405   PreTest pre_test;
406
407   /**
408    * Main function for each peer
409    */
410   MainTest main_test;
411
412   /**
413    * Callback called once the requested peers are available
414    */
415   ReplyHandle reply_handle;
416
417   /**
418    * Called directly before disconnecting from the service
419    */
420   PostTest post_test;
421
422   /**
423    * Function to evaluate the test results
424    */
425   EvaluationCallback eval_cb;
426
427   /**
428    * Request interval
429    */
430   uint32_t request_interval;
431
432   /**
433    * Number of Requests to make.
434    */
435   uint32_t num_requests;
436
437   /**
438    * Run with (-out) churn
439    */
440   enum OPTION_CHURN have_churn;
441
442   /**
443    * Quit test before timeout?
444    */
445   enum OPTION_QUICK_QUIT have_quick_quit;
446
447   /**
448    * Collect statistics at the end?
449    */
450   enum OPTION_COLLECT_STATISTICS have_collect_statistics;
451 } cur_test_run;
452
453 /**
454  * Are we shutting down?
455  */
456 static int in_shutdown;
457
458 /**
459  * Append arguments to file
460  */
461 static void
462 tofile_ (const char *file_name, const char *line)
463 {
464   struct GNUNET_DISK_FileHandle *f;
465   /* char output_buffer[512]; */
466   size_t size;
467   /* int size; */
468   size_t size2;
469
470   if (NULL == (f = GNUNET_DISK_file_open (file_name,
471                                           GNUNET_DISK_OPEN_APPEND |
472                                           GNUNET_DISK_OPEN_WRITE |
473                                           GNUNET_DISK_OPEN_CREATE,
474                                           GNUNET_DISK_PERM_USER_READ |
475                                           GNUNET_DISK_PERM_USER_WRITE |
476                                           GNUNET_DISK_PERM_GROUP_READ |
477                                           GNUNET_DISK_PERM_OTHER_READ)))
478   {
479     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
480                 "Not able to open file %s\n",
481                 file_name);
482     return;
483   }
484   /* size = GNUNET_snprintf (output_buffer,
485                           sizeof (output_buffer),
486                           "%llu %s\n",
487                           GNUNET_TIME_absolute_get ().abs_value_us,
488                           line);
489   if (0 > size)
490   {
491     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
492                 "Failed to write string to buffer (size: %i)\n",
493                 size);
494     return;
495   } */
496
497   size = strlen (line) * sizeof (char);
498
499   size2 = GNUNET_DISK_file_write (f, line, size);
500   if (size != size2)
501   {
502     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
503                 "Unable to write to file! (Size: %lu, size2: %lu)\n",
504                 size,
505                 size2);
506     if (GNUNET_YES != GNUNET_DISK_file_close (f))
507     {
508       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
509                   "Unable to close file\n");
510     }
511     return;
512   }
513
514   if (GNUNET_YES != GNUNET_DISK_file_close (f))
515   {
516     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
517                 "Unable to close file\n");
518   }
519 }
520
521 /**
522  * This function is used to facilitate writing important information to disk
523  */
524 #define tofile(file_name, ...) do {\
525   char tmp_buf[512];\
526     int size;\
527     size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
528     if (0 > size)\
529       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
530                      "Failed to create tmp_buf\n");\
531     else\
532       tofile_(file_name,tmp_buf);\
533   } while (0);
534
535
536 /**
537  * Write the ids and their according index in the given array to a file
538  * Unused
539  */
540 /* static void
541 ids_to_file (char *file_name,
542              struct GNUNET_PeerIdentity *peer_ids,
543              unsigned int num_peer_ids)
544 {
545   unsigned int i;
546
547   for (i=0 ; i < num_peer_ids ; i++)
548   {
549     to_file (file_name,
550              "%u\t%s",
551              i,
552              GNUNET_i2s_full (&peer_ids[i]));
553   }
554 } */
555
556 /**
557  * Test the success of a single test
558  */
559 static int
560 evaluate (void)
561 {
562   unsigned int i;
563   int tmp_ok;
564
565   tmp_ok = 1;
566
567   for (i = 0; i < num_peers; i++)
568   {
569     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570         "%u. peer [%s] received %u of %u expected peer_ids: %i\n",
571         i,
572         GNUNET_i2s (rps_peers[i].peer_id),
573         rps_peers[i].num_recv_ids,
574         rps_peers[i].num_ids_to_request,
575         (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids));
576     tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids);
577   }
578   return tmp_ok? 0 : 1;
579 }
580
581
582 /**
583  * Creates an oplist entry and adds it to the oplist DLL
584  */
585 static struct OpListEntry *
586 make_oplist_entry ()
587 {
588   struct OpListEntry *entry;
589
590   entry = GNUNET_new (struct OpListEntry);
591   GNUNET_CONTAINER_DLL_insert_tail (oplist_head, oplist_tail, entry);
592   return entry;
593 }
594
595
596 /**
597  * Task run on timeout to shut everything down.
598  */
599 static void
600 shutdown_op (void *cls)
601 {
602   unsigned int i;
603
604   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
605               "Shutdown task scheduled, going down.\n");
606   in_shutdown = GNUNET_YES;
607   if (NULL != churn_task)
608   {
609     GNUNET_SCHEDULER_cancel (churn_task);
610     churn_task = NULL;
611   }
612   for (i = 0; i < num_peers; i++)
613   {
614     if (NULL != rps_peers[i].op)
615       GNUNET_TESTBED_operation_done (rps_peers[i].op);
616     if (NULL != cur_test_run.post_test)
617     {
618       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i);
619       cur_test_run.post_test (&rps_peers[i]);
620     }
621   }
622   /* If we do not collect statistics, shut down directly */
623   if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics ||
624       num_peers <= num_shutdown_ready)
625   {
626     GNUNET_SCHEDULER_shutdown ();
627   }
628 }
629
630
631 /**
632  * Seed peers.
633  */
634 static void
635 seed_peers (void *cls)
636 {
637   struct RPSPeer *peer = cls;
638   unsigned int amount;
639   unsigned int i;
640
641   // TODO if malicious don't seed mal peers
642   amount = round (.5 * num_peers);
643
644   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding peers:\n");
645   for (i = 0 ; i < amount ; i++)
646     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
647                 i,
648                 GNUNET_i2s (&rps_peer_ids[i]));
649
650   GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids);
651 }
652
653
654 /**
655  * Seed peers.
656  */
657 static void
658 seed_peers_big (void *cls)
659 {
660   struct RPSPeer *peer = cls;
661   unsigned int seed_msg_size;
662   uint32_t num_peers_max;
663   unsigned int amount;
664   unsigned int i;
665
666   seed_msg_size = 8; /* sizeof (struct GNUNET_RPS_CS_SeedMessage) */
667   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - seed_msg_size) /
668     sizeof (struct GNUNET_PeerIdentity);
669   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670       "Peers that fit in one seed msg; %u\n",
671       num_peers_max);
672   amount = num_peers_max + (0.5 * num_peers_max);
673   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674       "Seeding many (%u) peers:\n",
675       amount);
676   struct GNUNET_PeerIdentity ids_to_seed[amount];
677   for (i = 0; i < amount; i++)
678   {
679     ids_to_seed[i] = *peer->peer_id;
680     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
681                 i,
682                 GNUNET_i2s (&ids_to_seed[i]));
683   }
684
685   GNUNET_RPS_seed_ids (peer->rps_handle, amount, ids_to_seed);
686 }
687
688 /**
689  * Get the id of peer i.
690  */
691   void
692 info_cb (void *cb_cls,
693          struct GNUNET_TESTBED_Operation *op,
694          const struct GNUNET_TESTBED_PeerInformation *pinfo,
695          const char *emsg)
696 {
697   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
698
699   if (GNUNET_YES == in_shutdown)
700   {
701     return;
702   }
703
704   if (NULL == pinfo || NULL != emsg)
705   {
706     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
707     GNUNET_TESTBED_operation_done (entry->op);
708     return;
709   }
710
711   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
712               "Peer %u is %s\n",
713               entry->index,
714               GNUNET_i2s (pinfo->result.id));
715
716   rps_peer_ids[entry->index] = *(pinfo->result.id);
717   rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
718
719   GNUNET_assert (GNUNET_OK ==
720       GNUNET_CONTAINER_multipeermap_put (peer_map,
721         &rps_peer_ids[entry->index],
722         &rps_peers[entry->index],
723         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
724   tofile ("/tmp/rps/peer_ids",
725            "%u\t%s\n",
726            entry->index,
727            GNUNET_i2s_full (&rps_peer_ids[entry->index]));
728
729   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
730   GNUNET_TESTBED_operation_done (entry->op);
731   GNUNET_free (entry);
732 }
733
734
735 /**
736  * Callback to be called when RPS service connect operation is completed
737  *
738  * @param cls the callback closure from functions generating an operation
739  * @param op the operation that has been finished
740  * @param ca_result the RPS service handle returned from rps_connect_adapter
741  * @param emsg error message in case the operation has failed; will be NULL if
742  *          operation has executed successfully.
743  */
744 static void
745 rps_connect_complete_cb (void *cls,
746                          struct GNUNET_TESTBED_Operation *op,
747                          void *ca_result,
748                          const char *emsg)
749 {
750   struct RPSPeer *rps_peer = cls;
751   struct GNUNET_RPS_Handle *rps = ca_result;
752
753   if (GNUNET_YES == in_shutdown)
754   {
755     return;
756   }
757
758   rps_peer->rps_handle = rps;
759   rps_peer->online = GNUNET_YES;
760   num_peers_online++;
761
762   GNUNET_assert (op == rps_peer->op);
763   if (NULL != emsg)
764   {
765     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
766                 "Failed to connect to RPS service: %s\n",
767                 emsg);
768     ok = 1;
769     GNUNET_SCHEDULER_shutdown ();
770     return;
771   }
772
773   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Started client successfully\n");
774
775   cur_test_run.main_test (rps_peer);
776 }
777
778
779 /**
780  * Adapter function called to establish a connection to
781  * the RPS service.
782  *
783  * @param cls closure
784  * @param cfg configuration of the peer to connect to; will be available until
785  *          GNUNET_TESTBED_operation_done() is called on the operation returned
786  *          from GNUNET_TESTBED_service_connect()
787  * @return service handle to return in 'op_result', NULL on error
788  */
789 static void *
790 rps_connect_adapter (void *cls,
791                                  const struct GNUNET_CONFIGURATION_Handle *cfg)
792 {
793   struct GNUNET_RPS_Handle *h;
794
795   h = GNUNET_RPS_connect (cfg);
796
797   if (NULL != cur_test_run.pre_test)
798     cur_test_run.pre_test (cls, h);
799
800   return h;
801 }
802
803 /**
804  * Called to open a connection to the peer's statistics
805  *
806  * @param cls peer context
807  * @param cfg configuration of the peer to connect to; will be available until
808  *          GNUNET_TESTBED_operation_done() is called on the operation returned
809  *          from GNUNET_TESTBED_service_connect()
810  * @return service handle to return in 'op_result', NULL on error
811  */
812 static void *
813 stat_connect_adapter (void *cls,
814                       const struct GNUNET_CONFIGURATION_Handle *cfg)
815 {
816   struct RPSPeer *peer = cls;
817
818   peer->stats_h = GNUNET_STATISTICS_create ("rps-profiler", cfg);
819   return peer->stats_h;
820 }
821
822 /**
823  * Called to disconnect from peer's statistics service
824  *
825  * @param cls peer context
826  * @param op_result service handle returned from the connect adapter
827  */
828 static void
829 stat_disconnect_adapter (void *cls, void *op_result)
830 {
831   struct RPSPeer *peer = cls;
832
833   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
834   //              (peer->stats_h, "core", "# peers connected",
835   //               stat_iterator, peer));
836   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
837   //              (peer->stats_h, "nse", "# peers connected",
838   //               stat_iterator, peer));
839   GNUNET_STATISTICS_destroy (op_result, GNUNET_NO);
840   peer->stats_h = NULL;
841 }
842
843 /**
844  * Called after successfully opening a connection to a peer's statistics
845  * service; we register statistics monitoring for CORE and NSE here.
846  *
847  * @param cls the callback closure from functions generating an operation
848  * @param op the operation that has been finished
849  * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
850  * @param emsg error message in case the operation has failed; will be NULL if
851  *          operation has executed successfully.
852  */
853 static void
854 stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
855                   void *ca_result, const char *emsg )
856 {
857   //struct GNUNET_STATISTICS_Handle *sh = ca_result;
858   //struct RPSPeer *peer = (struct RPSPeer *) cls;
859
860   if (NULL != emsg)
861   {
862     GNUNET_break (0);
863     return;
864   }
865   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
866   //              (sh, "core", "# peers connected",
867   //               stat_iterator, peer));
868   //GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
869   //              (sh, "nse", "# peers connected",
870   //               stat_iterator, peer));
871 }
872
873
874 /**
875  * Adapter function called to destroy connection to
876  * RPS service.
877  *
878  * @param cls closure
879  * @param op_result service handle returned from the connect adapter
880  */
881 static void
882 rps_disconnect_adapter (void *cls,
883                                           void *op_result)
884 {
885   struct RPSPeer *peer = cls;
886   struct GNUNET_RPS_Handle *h = op_result;
887   GNUNET_assert (NULL != peer);
888   GNUNET_RPS_disconnect (h);
889   peer->rps_handle = NULL;
890 }
891
892
893 /***********************************************************************
894  * Definition of tests
895 ***********************************************************************/
896
897 // TODO check whether tests can be stopped earlier
898 static int
899 default_eval_cb (void)
900 {
901   return evaluate ();
902 }
903
904 static int
905 no_eval (void)
906 {
907   return 0;
908 }
909
910 /**
911  * Initialise given RPSPeer
912  */
913 static void default_init_peer (struct RPSPeer *rps_peer)
914 {
915   rps_peer->num_ids_to_request = 1;
916 }
917
918 /**
919  * Callback to call on receipt of a reply
920  *
921  * @param cls closure
922  * @param n number of peers
923  * @param recv_peers the received peers
924  */
925 static void
926 default_reply_handle (void *cls,
927                       uint64_t n,
928                       const struct GNUNET_PeerIdentity *recv_peers)
929 {
930   struct RPSPeer *rps_peer;
931   struct PendingReply *pending_rep = (struct PendingReply *) cls;
932   unsigned int i;
933
934   rps_peer = pending_rep->rps_peer;
935   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
936                                rps_peer->pending_rep_tail,
937                                pending_rep);
938   rps_peer->num_pending_reps--;
939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940               "[%s] got %" PRIu64 " peers:\n",
941               GNUNET_i2s (rps_peer->peer_id),
942               n);
943
944   for (i = 0; i < n; i++)
945   {
946     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947                 "%u: %s\n",
948                 i,
949                 GNUNET_i2s (&recv_peers[i]));
950
951     rps_peer->num_recv_ids++;
952   }
953
954   if (0 == evaluate () && HAVE_QUICK_QUIT == cur_test_run.have_quick_quit)
955   {
956     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test succeeded before timeout\n");
957     GNUNET_assert (NULL != shutdown_task);
958     GNUNET_SCHEDULER_cancel (shutdown_task);
959     shutdown_task = GNUNET_SCHEDULER_add_now (&shutdown_op, NULL);
960     GNUNET_assert (NULL!= shutdown_task);
961   }
962 }
963
964 /**
965  * Request random peers.
966  */
967 static void
968 request_peers (void *cls)
969 {
970   struct PendingRequest *pending_req = cls;
971   struct RPSPeer *rps_peer;
972   struct PendingReply *pending_rep;
973
974   if (GNUNET_YES == in_shutdown)
975     return;
976   rps_peer = pending_req->rps_peer;
977   GNUNET_assert (1 <= rps_peer->num_pending_reqs);
978   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
979                                rps_peer->pending_req_tail,
980                                pending_req);
981   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
982               "Requesting one peer\n");
983   pending_rep = GNUNET_new (struct PendingReply);
984   pending_rep->rps_peer = rps_peer;
985   pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
986       1,
987       cur_test_run.reply_handle,
988       pending_rep);
989   GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
990                                     rps_peer->pending_rep_tail,
991                                     pending_rep);
992   rps_peer->num_pending_reps++;
993   rps_peer->num_pending_reqs--;
994 }
995
996 static void
997 cancel_pending_req (struct PendingRequest *pending_req)
998 {
999   struct RPSPeer *rps_peer;
1000
1001   rps_peer = pending_req->rps_peer;
1002   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
1003                                rps_peer->pending_req_tail,
1004                                pending_req);
1005   rps_peer->num_pending_reqs--;
1006   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1007               "Cancelling pending request\n");
1008   GNUNET_SCHEDULER_cancel (pending_req->request_task);
1009   GNUNET_free (pending_req);
1010 }
1011
1012 static void
1013 cancel_request (struct PendingReply *pending_rep)
1014 {
1015   struct RPSPeer *rps_peer;
1016
1017   rps_peer = pending_rep->rps_peer;
1018   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1019                                rps_peer->pending_rep_tail,
1020                                pending_rep);
1021   rps_peer->num_pending_reps--;
1022   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1023               "Cancelling request\n");
1024   GNUNET_RPS_request_cancel (pending_rep->req_handle);
1025   GNUNET_free (pending_rep);
1026 }
1027
1028 /**
1029  * Cancel a request.
1030  */
1031 static void
1032 cancel_request_cb (void *cls)
1033 {
1034   struct RPSPeer *rps_peer = cls;
1035   struct PendingReply *pending_rep;
1036
1037   if (GNUNET_YES == in_shutdown)
1038     return;
1039   pending_rep = rps_peer->pending_rep_head;
1040   GNUNET_assert (1 <= rps_peer->num_pending_reps);
1041   cancel_request (pending_rep);
1042 }
1043
1044
1045 /**
1046  * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
1047  * issued, nor replied
1048  */
1049 void
1050 schedule_missing_requests (struct RPSPeer *rps_peer)
1051 {
1052   unsigned int i;
1053   struct PendingRequest *pending_req;
1054
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056       "Scheduling %u - %u missing requests\n",
1057       rps_peer->num_ids_to_request,
1058       rps_peer->num_pending_reqs + rps_peer->num_pending_reps);
1059   GNUNET_assert (rps_peer->num_pending_reqs + rps_peer->num_pending_reps <=
1060       rps_peer->num_ids_to_request);
1061   for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
1062        i < rps_peer->num_ids_to_request; i++)
1063   {
1064     pending_req = GNUNET_new (struct PendingRequest);
1065     pending_req->rps_peer = rps_peer;
1066     pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
1067         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1068           cur_test_run.request_interval * i),
1069         request_peers,
1070         pending_req);
1071     GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
1072                                       rps_peer->pending_req_tail,
1073                                       pending_req);
1074     rps_peer->num_pending_reqs++;
1075   }
1076 }
1077
1078 void
1079 cancel_pending_req_rep (struct RPSPeer *rps_peer)
1080 {
1081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082       "Cancelling all (pending) requests.\n");
1083   while (NULL != rps_peer->pending_req_head)
1084     cancel_pending_req (rps_peer->pending_req_head);
1085   GNUNET_assert (0 == rps_peer->num_pending_reqs);
1086   while (NULL != rps_peer->pending_rep_head)
1087     cancel_request (rps_peer->pending_rep_head);
1088   GNUNET_assert (0 == rps_peer->num_pending_reps);
1089 }
1090
1091 /***********************************
1092  * MALICIOUS
1093 ***********************************/
1094
1095 /**
1096  * Initialise only non-mal RPSPeers
1097  */
1098 static void mal_init_peer (struct RPSPeer *rps_peer)
1099 {
1100   if (rps_peer->index >= round (portion * num_peers))
1101     rps_peer->num_ids_to_request = 1;
1102 }
1103
1104
1105 /**
1106  * @brief Set peers to (non-)malicious before execution
1107  *
1108  * Of signature #PreTest
1109  *
1110  * @param rps_peer the peer to set (non-) malicious
1111  * @param h the handle to the service
1112  */
1113 static void
1114 mal_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1115 {
1116   #ifdef ENABLE_MALICIOUS
1117   uint32_t num_mal_peers;
1118
1119   GNUNET_assert ( (1 >= portion) &&
1120                   (0 <  portion) );
1121   num_mal_peers = round (portion * num_peers);
1122
1123   if (rps_peer->index < num_mal_peers)
1124   {
1125     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1126                 "%u. peer [%s] of %" PRIu32 " malicious peers turning malicious\n",
1127                 rps_peer->index,
1128                 GNUNET_i2s (rps_peer->peer_id),
1129                 num_mal_peers);
1130
1131     GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
1132                               rps_peer_ids, target_peer);
1133   }
1134   #endif /* ENABLE_MALICIOUS */
1135 }
1136
1137 static void
1138 mal_cb (struct RPSPeer *rps_peer)
1139 {
1140   uint32_t num_mal_peers;
1141
1142   if (GNUNET_YES == in_shutdown)
1143   {
1144     return;
1145   }
1146
1147   #ifdef ENABLE_MALICIOUS
1148   GNUNET_assert ( (1 >= portion) &&
1149                   (0 <  portion) );
1150   num_mal_peers = round (portion * num_peers);
1151
1152   if (rps_peer->index >= num_mal_peers)
1153   { /* It's useless to ask a malicious peer about a random sample -
1154        it's not sampling */
1155     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1156                                   seed_peers, rps_peer);
1157     schedule_missing_requests (rps_peer);
1158   }
1159   #endif /* ENABLE_MALICIOUS */
1160 }
1161
1162
1163 /***********************************
1164  * SINGLE_REQUEST
1165 ***********************************/
1166 static void
1167 single_req_cb (struct RPSPeer *rps_peer)
1168 {
1169   if (GNUNET_YES == in_shutdown)
1170   {
1171     return;
1172   }
1173
1174   schedule_missing_requests (rps_peer);
1175 }
1176
1177 /***********************************
1178  * DELAYED_REQUESTS
1179 ***********************************/
1180 static void
1181 delay_req_cb (struct RPSPeer *rps_peer)
1182 {
1183   if (GNUNET_YES == in_shutdown)
1184   {
1185     return;
1186   }
1187
1188   schedule_missing_requests (rps_peer);
1189 }
1190
1191 /***********************************
1192  * SEED
1193 ***********************************/
1194 static void
1195 seed_cb (struct RPSPeer *rps_peer)
1196 {
1197   if (GNUNET_YES == in_shutdown)
1198   {
1199     return;
1200   }
1201
1202   GNUNET_SCHEDULER_add_delayed (
1203       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
1204       seed_peers, rps_peer);
1205 }
1206
1207 /***********************************
1208  * SEED_BIG
1209 ***********************************/
1210 static void
1211 seed_big_cb (struct RPSPeer *rps_peer)
1212 {
1213   if (GNUNET_YES == in_shutdown)
1214   {
1215     return;
1216   }
1217
1218   // TODO test seeding > GNUNET_MAX_MESSAGE_SIZE peers
1219   GNUNET_SCHEDULER_add_delayed (
1220       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1221       seed_peers_big, rps_peer);
1222 }
1223
1224 /***********************************
1225  * SINGLE_PEER_SEED
1226 ***********************************/
1227 static void
1228 single_peer_seed_cb (struct RPSPeer *rps_peer)
1229 {
1230   // TODO
1231 }
1232
1233 /***********************************
1234  * SEED_REQUEST
1235 ***********************************/
1236 static void
1237 seed_req_cb (struct RPSPeer *rps_peer)
1238 {
1239   if (GNUNET_YES == in_shutdown)
1240   {
1241     return;
1242   }
1243
1244   GNUNET_SCHEDULER_add_delayed (
1245       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1246       seed_peers, rps_peer);
1247   schedule_missing_requests (rps_peer);
1248 }
1249
1250 //TODO start big mal
1251
1252 /***********************************
1253  * REQUEST_CANCEL
1254 ***********************************/
1255 static void
1256 req_cancel_cb (struct RPSPeer *rps_peer)
1257 {
1258   if (GNUNET_YES == in_shutdown)
1259   {
1260     return;
1261   }
1262
1263   schedule_missing_requests (rps_peer);
1264   GNUNET_SCHEDULER_add_delayed (
1265       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1266                                      (cur_test_run.request_interval + 1)),
1267       cancel_request_cb, rps_peer);
1268 }
1269
1270 /***********************************
1271  * CHURN
1272 ***********************************/
1273
1274 static void
1275 churn (void *cls);
1276
1277 /**
1278  * @brief Starts churn
1279  *
1280  * Has signature of #MainTest
1281  *
1282  * This is not implemented too nicely as this is called for each peer, but we
1283  * only need to call it once. (Yes we check that we only schedule the task
1284  * once.)
1285  *
1286  * @param rps_peer The peer it's called for
1287  */
1288 static void
1289 churn_test_cb (struct RPSPeer *rps_peer)
1290 {
1291   if (GNUNET_YES == in_shutdown)
1292   {
1293     return;
1294   }
1295
1296   /* Start churn */
1297   if (HAVE_CHURN == cur_test_run.have_churn && NULL == churn_task)
1298   {
1299     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300                 "Starting churn task\n");
1301     churn_task = GNUNET_SCHEDULER_add_delayed (
1302           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1303           churn,
1304           NULL);
1305   } else {
1306     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1307                 "Not starting churn task\n");
1308   }
1309
1310   schedule_missing_requests (rps_peer);
1311 }
1312
1313 /***********************************
1314  * PROFILER
1315 ***********************************/
1316
1317 /**
1318  * Callback to be called when RPS service is started or stopped at peers
1319  *
1320  * @param cls NULL
1321  * @param op the operation handle
1322  * @param emsg NULL on success; otherwise an error description
1323  */
1324 static void
1325 churn_cb (void *cls,
1326           struct GNUNET_TESTBED_Operation *op,
1327           const char *emsg)
1328 {
1329   // FIXME
1330   struct OpListEntry *entry = cls;
1331
1332   if (GNUNET_YES == in_shutdown)
1333   {
1334     return;
1335   }
1336
1337   GNUNET_TESTBED_operation_done (entry->op);
1338   if (NULL != emsg)
1339   {
1340     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
1341     GNUNET_SCHEDULER_shutdown ();
1342     return;
1343   }
1344   GNUNET_assert (0 != entry->delta);
1345
1346   num_peers_online += entry->delta;
1347
1348   if (PEER_GO_OFFLINE == entry->delta)
1349   { /* Peer hopefully just went offline */
1350     if (GNUNET_YES != rps_peers[entry->index].online)
1351     {
1352       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1353                   "peer %s was expected to go offline but is still marked as online\n",
1354                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1355       GNUNET_break (0);
1356     }
1357     else
1358     {
1359       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1360                   "peer %s probably went offline as expected\n",
1361                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1362     }
1363     rps_peers[entry->index].online = GNUNET_NO;
1364   }
1365
1366   else if (PEER_GO_ONLINE < entry->delta)
1367   { /* Peer hopefully just went online */
1368     if (GNUNET_NO != rps_peers[entry->index].online)
1369     {
1370       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1371                   "peer %s was expected to go online but is still marked as offline\n",
1372                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1373       GNUNET_break (0);
1374     }
1375     else
1376     {
1377       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1378                   "peer %s probably went online as expected\n",
1379                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1380       if (NULL != cur_test_run.pre_test)
1381       {
1382         cur_test_run.pre_test (&rps_peers[entry->index],
1383             rps_peers[entry->index].rps_handle);
1384         schedule_missing_requests (&rps_peers[entry->index]);
1385       }
1386     }
1387     rps_peers[entry->index].online = GNUNET_YES;
1388   }
1389   else
1390   {
1391     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1392         "Invalid value for delta: %i\n", entry->delta);
1393     GNUNET_break (0);
1394   }
1395
1396   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
1397   rps_peers[entry->index].entry_op_manage = NULL;
1398   GNUNET_free (entry);
1399   //if (num_peers_in_round[current_round] == peers_running)
1400   //  run_round ();
1401 }
1402
1403 /**
1404  * @brief Set the rps-service up or down for a specific peer
1405  *
1406  * @param i index of action
1407  * @param j index of peer
1408  * @param delta (#PEER_ONLINE_DELTA) down (-1) or up (1)
1409  * @param prob_go_on_off the probability of the action
1410  */
1411 static void
1412 manage_service_wrapper (unsigned int i, unsigned int j,
1413                         enum PEER_ONLINE_DELTA delta,
1414                         double prob_go_on_off)
1415 {
1416   struct OpListEntry *entry = NULL;
1417   uint32_t prob;
1418
1419   /* make sure that management operation is not already scheduled */
1420   if (NULL != rps_peers[j].entry_op_manage)
1421   {
1422     return;
1423   }
1424
1425   prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1426                                    UINT32_MAX);
1427   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1428               "%u. selected peer (%u: %s) is %s.\n",
1429               i,
1430               j,
1431               GNUNET_i2s (rps_peers[j].peer_id),
1432               (PEER_GO_ONLINE == delta) ? "online" : "offline");
1433   if (prob < prob_go_on_off * UINT32_MAX)
1434   {
1435     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1436                 "%s goes %s\n",
1437                 GNUNET_i2s (rps_peers[j].peer_id),
1438                 (PEER_GO_OFFLINE == delta) ? "offline" : "online");
1439
1440     if (PEER_GO_OFFLINE == delta)
1441       cancel_pending_req_rep (&rps_peers[j]);
1442     entry = make_oplist_entry ();
1443     entry->delta = delta;
1444     entry->index = j;
1445     entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
1446                                                     testbed_peers[j],
1447                                                     "rps",
1448                                                     &churn_cb,
1449                                                     entry,
1450                                                     (PEER_GO_OFFLINE == delta) ? 0 : 1);
1451   }
1452   rps_peers[j].entry_op_manage = entry;
1453 }
1454
1455
1456 static void
1457 churn (void *cls)
1458 {
1459   unsigned int i;
1460   unsigned int j;
1461   double portion_online;
1462   unsigned int *permut;
1463   double prob_go_offline;
1464   double portion_go_online;
1465   double portion_go_offline;
1466
1467   if (GNUNET_YES == in_shutdown)
1468   {
1469     return;
1470   }
1471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472               "Churn function executing\n");
1473
1474   churn_task = NULL; /* Should be invalid by now */
1475
1476   /* Compute the probability for an online peer to go offline
1477    * this round */
1478   portion_online = num_peers_online * 1.0 / num_peers;
1479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1480               "Portion online: %f\n",
1481               portion_online);
1482   portion_go_online = ((1 - portion_online) * .5 * .66);
1483   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1484               "Portion that should go online: %f\n",
1485               portion_go_online);
1486   portion_go_offline = (portion_online + portion_go_online) - .75;
1487   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1488               "Portion that probably goes offline: %f\n",
1489               portion_go_offline);
1490   prob_go_offline = portion_go_offline / (portion_online * .5);
1491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492               "Probability of a selected online peer to go offline: %f\n",
1493               prob_go_offline);
1494
1495   permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK,
1496                                          (unsigned int) num_peers);
1497
1498   /* Go over 50% randomly chosen peers */
1499   for (i = 0; i < .5 * num_peers; i++)
1500   {
1501     j = permut[i];
1502
1503     /* If online, shut down with certain probability */
1504     if (GNUNET_YES == rps_peers[j].online)
1505     {
1506       manage_service_wrapper (i, j, -1, prob_go_offline);
1507     }
1508
1509     /* If offline, restart with certain probability */
1510     else if (GNUNET_NO == rps_peers[j].online)
1511     {
1512       manage_service_wrapper (i, j, 1, 0.66);
1513     }
1514   }
1515
1516   GNUNET_free (permut);
1517
1518   churn_task = GNUNET_SCHEDULER_add_delayed (
1519         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1520         churn,
1521         NULL);
1522 }
1523
1524
1525 /**
1526  * Initialise given RPSPeer
1527  */
1528 static void profiler_init_peer (struct RPSPeer *rps_peer)
1529 {
1530   if (num_peers - 1 == rps_peer->index)
1531     rps_peer->num_ids_to_request = cur_test_run.num_requests;
1532 }
1533
1534
1535 /**
1536  * Callback to call on receipt of a reply
1537  *
1538  * @param cls closure
1539  * @param n number of peers
1540  * @param recv_peers the received peers
1541  */
1542 static void
1543 profiler_reply_handle (void *cls,
1544                       uint64_t n,
1545                       const struct GNUNET_PeerIdentity *recv_peers)
1546 {
1547   struct RPSPeer *rps_peer;
1548   struct RPSPeer *rcv_rps_peer;
1549   char *file_name;
1550   char *file_name_dh;
1551   unsigned int i;
1552   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1553
1554   rps_peer = pending_rep->rps_peer;
1555   file_name = "/tmp/rps/received_ids";
1556   file_name_dh = "/tmp/rps/diehard_input";
1557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558               "[%s] got %" PRIu64 " peers:\n",
1559               GNUNET_i2s (rps_peer->peer_id),
1560               n);
1561   for (i = 0; i < n; i++)
1562   {
1563     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1564                 "%u: %s\n",
1565                 i,
1566                 GNUNET_i2s (&recv_peers[i]));
1567     tofile (file_name,
1568              "%s\n",
1569              GNUNET_i2s_full (&recv_peers[i]));
1570     rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
1571     GNUNET_assert (NULL != rcv_rps_peer);
1572     tofile (file_name_dh,
1573              "%" PRIu32 "\n",
1574              (uint32_t) rcv_rps_peer->index);
1575   }
1576   default_reply_handle (cls, n, recv_peers);
1577 }
1578
1579
1580 static void
1581 profiler_cb (struct RPSPeer *rps_peer)
1582 {
1583   if (GNUNET_YES == in_shutdown)
1584   {
1585     return;
1586   }
1587
1588   /* Start churn */
1589   if (HAVE_CHURN == cur_test_run.have_churn && NULL == churn_task)
1590   {
1591     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1592                 "Starting churn task\n");
1593     churn_task = GNUNET_SCHEDULER_add_delayed (
1594           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1595           churn,
1596           NULL);
1597   } else {
1598     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599                 "Not starting churn task\n");
1600   }
1601
1602   /* Only request peer ids at one peer.
1603    * (It's the before-last because last one is target of the focussed attack.)
1604    */
1605   if (eval_peer == rps_peer)
1606     schedule_missing_requests (rps_peer);
1607 }
1608
1609 /**
1610  * Function called from #profiler_eval with a filename.
1611  *
1612  * @param cls closure
1613  * @param filename complete filename (absolute path)
1614  * @return #GNUNET_OK to continue to iterate,
1615  *  #GNUNET_NO to stop iteration with no error,
1616  *  #GNUNET_SYSERR to abort iteration with error!
1617  */
1618 int
1619 file_name_cb (void *cls, const char *filename)
1620 {
1621   if (NULL != strstr (filename, "sampler_el"))
1622   {
1623     struct RPS_SamplerElement *s_elem;
1624     struct GNUNET_CRYPTO_AuthKey auth_key;
1625     const char *key_char;
1626     uint32_t i;
1627
1628     key_char = filename + 20; /* Length of "/tmp/rps/sampler_el-" */
1629     tofile (filename, "--------------------------\n");
1630
1631     auth_key = string_to_auth_key (key_char);
1632     s_elem = RPS_sampler_elem_create ();
1633     RPS_sampler_elem_set (s_elem, auth_key);
1634
1635     for (i = 0; i < num_peers; i++)
1636     {
1637       RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
1638     }
1639     RPS_sampler_elem_destroy (s_elem);
1640   }
1641   return GNUNET_OK;
1642 }
1643
1644 /**
1645  * This is run after the test finished.
1646  *
1647  * Compute all perfect samples.
1648  */
1649 int
1650 profiler_eval (void)
1651 {
1652   /* Compute perfect sample for each sampler element */
1653   if (-1 == GNUNET_DISK_directory_scan ("/tmp/rps/", file_name_cb, NULL))
1654   {
1655     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n");
1656   }
1657
1658   return evaluate ();
1659 }
1660
1661 /**
1662  * Continuation called by #GNUNET_STATISTICS_get() functions.
1663  *
1664  * Checks whether all peers received their statistics yet.
1665  * Issues the shutdown.
1666  *
1667  * @param cls closure
1668  * @param success #GNUNET_OK if statistics were
1669  *        successfully obtained, #GNUNET_SYSERR if not.
1670  */
1671 void
1672 post_test_shutdown_ready_cb (void *cls,
1673                              int success)
1674 {
1675   const struct RPSPeer *rps_peer = (const struct RPSPeer *) cls;
1676   if (NULL != rps_peer->stat_op)
1677   {
1678     GNUNET_TESTBED_operation_done (rps_peer->stat_op);
1679   }
1680   num_shutdown_ready++;
1681   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1682       "%" PRIu32 " of %" PRIu32 " Peers are ready to shut down\n",
1683       num_shutdown_ready,
1684       num_peers);
1685   if (num_peers <= num_shutdown_ready)
1686   {
1687     GNUNET_SCHEDULER_shutdown ();
1688   }
1689 }
1690
1691 /**
1692  * Callback function to process statistic values.
1693  *
1694  * @param cls closure
1695  * @param subsystem name of subsystem that created the statistic
1696  * @param name the name of the datum
1697  * @param value the current value
1698  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1699  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1700  */
1701 int
1702 stat_iterator (void *cls,
1703                const char *subsystem,
1704                const char *name,
1705                uint64_t value,
1706                int is_persistent)
1707 {
1708   //const struct RPSPeer *rps_peer = (const struct RPSPeer *) cls;
1709   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %" PRIu64 "\n", value);
1710   return GNUNET_OK;
1711 }
1712
1713 void post_profiler (const struct RPSPeer *rps_peer)
1714 {
1715   if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
1716   {
1717     GNUNET_STATISTICS_get (rps_peer->stats_h,
1718                            "rps",
1719                            "# rounds",
1720                            post_test_shutdown_ready_cb,
1721                            stat_iterator,
1722                            (struct RPSPeer *) rps_peer);
1723   }
1724 }
1725
1726
1727 /***********************************************************************
1728  * /Definition of tests
1729 ***********************************************************************/
1730
1731
1732 /**
1733  * Actual "main" function for the testcase.
1734  *
1735  * @param cls closure
1736  * @param h the run handle
1737  * @param n_peers number of peers in 'peers'
1738  * @param peers handle to peers run in the testbed
1739  * @param links_succeeded the number of overlay link connection attempts that
1740  *          succeeded
1741  * @param links_failed the number of overlay link connection attempts that
1742  *          failed
1743  */
1744 static void
1745 run (void *cls,
1746      struct GNUNET_TESTBED_RunHandle *h,
1747      unsigned int n_peers,
1748      struct GNUNET_TESTBED_Peer **peers,
1749      unsigned int links_succeeded,
1750      unsigned int links_failed)
1751 {
1752   unsigned int i;
1753   struct OpListEntry *entry;
1754   uint32_t num_mal_peers;
1755
1756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "RUN was called\n");
1757
1758   /* Check whether we timed out */
1759   if (n_peers != num_peers ||
1760       NULL == peers ||
1761       0 == links_succeeded)
1762   {
1763     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Going down due to args (eg. timeout)\n");
1764     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tn_peers: %u\n", n_peers);
1765     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tnum_peers: %" PRIu32 "\n", num_peers);
1766     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tpeers: %p\n", peers);
1767     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tlinks_succeeded: %u\n", links_succeeded);
1768     GNUNET_SCHEDULER_shutdown ();
1769     return;
1770   }
1771
1772
1773   /* Initialize peers */
1774   testbed_peers = peers;
1775   num_peers_online = 0;
1776   for (i = 0; i < num_peers; i++)
1777   {
1778     entry = make_oplist_entry ();
1779     entry->index = i;
1780     rps_peers[i].index = i;
1781     if (NULL != cur_test_run.init_peer)
1782       cur_test_run.init_peer (&rps_peers[i]);
1783     entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
1784                                                      GNUNET_TESTBED_PIT_IDENTITY,
1785                                                      &info_cb,
1786                                                      entry);
1787   }
1788
1789   /* Bring peers up */
1790   num_mal_peers = round (portion * num_peers);
1791   GNUNET_assert (num_peers == n_peers);
1792   for (i = 0; i < n_peers; i++)
1793   {
1794     rps_peers[i].index = i;
1795     if ( (rps_peers[i].num_recv_ids < rps_peers[i].num_ids_to_request) ||
1796          (i < num_mal_peers) )
1797     {
1798       rps_peers[i].op =
1799         GNUNET_TESTBED_service_connect (&rps_peers[i],
1800                                         peers[i],
1801                                         "rps",
1802                                         &rps_connect_complete_cb,
1803                                         &rps_peers[i],
1804                                         &rps_connect_adapter,
1805                                         &rps_disconnect_adapter,
1806                                         &rps_peers[i]);
1807     }
1808     /* Connect all peers to statistics service */
1809     if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
1810     {
1811       rps_peers[i].stat_op =
1812         GNUNET_TESTBED_service_connect (NULL,
1813                                         peers[i],
1814                                         "statistics",
1815                                         stat_complete_cb,
1816                                         &rps_peers[i],
1817                                         &stat_connect_adapter,
1818                                         &stat_disconnect_adapter,
1819                                         &rps_peers[i]);
1820     }
1821   }
1822
1823   if (NULL != churn_task)
1824     GNUNET_SCHEDULER_cancel (churn_task);
1825   shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
1826 }
1827
1828
1829 /**
1830  * Entry point for the testcase, sets up the testbed.
1831  *
1832  * @param argc unused
1833  * @param argv unused
1834  * @return 0 on success
1835  */
1836 int
1837 main (int argc, char *argv[])
1838 {
1839   int ret_value;
1840
1841   num_peers = 5;
1842   num_shutdown_ready = 0;
1843   cur_test_run.name = "test-rps-default";
1844   cur_test_run.init_peer = default_init_peer;
1845   cur_test_run.pre_test = NULL;
1846   cur_test_run.reply_handle = default_reply_handle;
1847   cur_test_run.eval_cb = default_eval_cb;
1848   cur_test_run.post_test = NULL;
1849   cur_test_run.have_churn = HAVE_CHURN;
1850   cur_test_run.have_collect_statistics = NO_COLLECT_STATISTICS;
1851   churn_task = NULL;
1852   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
1853
1854   if (strstr (argv[0], "malicious") != NULL)
1855   {
1856     cur_test_run.pre_test = mal_pre;
1857     cur_test_run.main_test = mal_cb;
1858     cur_test_run.init_peer = mal_init_peer;
1859
1860     if (strstr (argv[0], "_1") != NULL)
1861     {
1862       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 1\n");
1863       cur_test_run.name = "test-rps-malicious_1";
1864       mal_type = 1;
1865     }
1866     else if (strstr (argv[0], "_2") != NULL)
1867     {
1868       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 2\n");
1869       cur_test_run.name = "test-rps-malicious_2";
1870       mal_type = 2;
1871     }
1872     else if (strstr (argv[0], "_3") != NULL)
1873     {
1874       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 3\n");
1875       cur_test_run.name = "test-rps-malicious_3";
1876       mal_type = 3;
1877     }
1878   }
1879
1880   else if (strstr (argv[0], "_single_req") != NULL)
1881   {
1882     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test single request\n");
1883     cur_test_run.name = "test-rps-single-req";
1884     cur_test_run.main_test = single_req_cb;
1885     cur_test_run.have_churn = HAVE_NO_CHURN;
1886   }
1887
1888   else if (strstr (argv[0], "_delayed_reqs") != NULL)
1889   {
1890     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test delayed requests\n");
1891     cur_test_run.name = "test-rps-delayed-reqs";
1892     cur_test_run.main_test = delay_req_cb;
1893     cur_test_run.have_churn = HAVE_NO_CHURN;
1894   }
1895
1896   else if (strstr (argv[0], "_seed_big") != NULL)
1897   {
1898     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding (num_peers > GNUNET_MAX_MESSAGE_SIZE)\n");
1899     num_peers = 1;
1900     cur_test_run.name = "test-rps-seed-big";
1901     cur_test_run.main_test = seed_big_cb;
1902     cur_test_run.eval_cb = no_eval;
1903     cur_test_run.have_churn = HAVE_NO_CHURN;
1904     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1905   }
1906
1907   else if (strstr (argv[0], "_single_peer_seed") != NULL)
1908   {
1909     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on a single peer\n");
1910     cur_test_run.name = "test-rps-single-peer-seed";
1911     cur_test_run.main_test = single_peer_seed_cb;
1912     cur_test_run.have_churn = HAVE_NO_CHURN;
1913   }
1914
1915   else if (strstr (argv[0], "_seed_request") != NULL)
1916   {
1917     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on multiple peers\n");
1918     cur_test_run.name = "test-rps-seed-request";
1919     cur_test_run.main_test = seed_req_cb;
1920     cur_test_run.have_churn = HAVE_NO_CHURN;
1921   }
1922
1923   else if (strstr (argv[0], "_seed") != NULL)
1924   {
1925     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding\n");
1926     cur_test_run.name = "test-rps-seed";
1927     cur_test_run.main_test = seed_cb;
1928     cur_test_run.eval_cb = no_eval;
1929     cur_test_run.have_churn = HAVE_NO_CHURN;
1930   }
1931
1932   else if (strstr (argv[0], "_req_cancel") != NULL)
1933   {
1934     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
1935     cur_test_run.name = "test-rps-req-cancel";
1936     num_peers = 1;
1937     cur_test_run.main_test = req_cancel_cb;
1938     cur_test_run.eval_cb = no_eval;
1939     cur_test_run.have_churn = HAVE_NO_CHURN;
1940     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1941   }
1942
1943   else if (strstr (argv[0], "_churn") != NULL)
1944   {
1945     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test churn\n");
1946     cur_test_run.name = "test-rps-churn";
1947     num_peers = 5;
1948     cur_test_run.init_peer = default_init_peer;
1949     cur_test_run.main_test = churn_test_cb;
1950     cur_test_run.reply_handle = default_reply_handle;
1951     cur_test_run.eval_cb = default_eval_cb;
1952     cur_test_run.have_churn = HAVE_CHURN;
1953     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
1954     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1955   }
1956
1957   else if (strstr (argv[0], "profiler") != NULL)
1958   {
1959     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
1960     cur_test_run.name = "test-rps-profiler";
1961     num_peers = 10;
1962     mal_type = 3;
1963     cur_test_run.init_peer = profiler_init_peer;
1964     cur_test_run.pre_test = mal_pre;
1965     cur_test_run.main_test = profiler_cb;
1966     cur_test_run.reply_handle = profiler_reply_handle;
1967     cur_test_run.eval_cb = profiler_eval;
1968     cur_test_run.post_test = post_profiler;
1969     cur_test_run.request_interval = 2;
1970     cur_test_run.num_requests = 5;
1971     cur_test_run.have_churn = HAVE_CHURN;
1972     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
1973     cur_test_run.have_collect_statistics = COLLECT_STATISTICS;
1974     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300);
1975
1976     /* 'Clean' directory */
1977     (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
1978     GNUNET_DISK_directory_create ("/tmp/rps/");
1979   }
1980
1981   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
1982   peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
1983   rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1984   if ( (2 == mal_type) ||
1985        (3 == mal_type))
1986     target_peer = &rps_peer_ids[num_peers - 2];
1987   if (profiler_eval == cur_test_run.eval_cb)
1988     eval_peer = &rps_peers[num_peers - 1];    /* FIXME: eval_peer could be a
1989                                                  malicious peer if not careful
1990                                                  with the malicious portion */
1991
1992   ok = 1;
1993   ret_value = GNUNET_TESTBED_test_run (cur_test_run.name,
1994                                        "test_rps.conf",
1995                                        num_peers,
1996                                        0, NULL, NULL,
1997                                        &run, NULL);
1998   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1999               "_test_run returned.\n");
2000   if (GNUNET_OK != ret_value)
2001   {
2002     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2003                 "Test did not run successfully!\n");
2004   }
2005
2006   ret_value = cur_test_run.eval_cb();
2007   GNUNET_free (rps_peers);
2008   GNUNET_free (rps_peer_ids);
2009   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
2010   return ret_value;
2011 }
2012
2013 /* end of test_rps.c */