3d3853d647bd37e8118f723f477ada87c323c9c8
[oweals/gnunet.git] / src / rps / test_rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file rps/test_rps_multipeer.c
22  * @brief Testcase for the random peer sampling service.  Starts
23  *        a peergroup with a given number of peers, then waits to
24  *        receive size pushes/pulls from each peer.  Expects to wait
25  *        for one message from each peer.
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_testbed_service.h"
30
31 #include "gnunet_rps_service.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler_elem.h"
34
35 #include <inttypes.h>
36
37
38 /**
39  * How many peers do we start?
40  */
41 uint32_t num_peers;
42
43 /**
44  * How long do we run the test?
45  */
46 //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
47 static struct GNUNET_TIME_Relative timeout;
48
49
50 /**
51  * Portion of malicious peers
52  */
53 static double portion = .1;
54
55 /**
56  * Type of malicious peer to test
57  */
58 static unsigned int mal_type = 0;
59
60 /**
61  * Handles to all of the running peers
62  */
63 static struct GNUNET_TESTBED_Peer **testbed_peers;
64
65
66 /**
67  * Operation map entry
68  */
69 struct OpListEntry
70 {
71   /**
72    * DLL next ptr
73    */
74   struct OpListEntry *next;
75
76   /**
77    * DLL prev ptr
78    */
79   struct OpListEntry *prev;
80
81   /**
82    * The testbed operation
83    */
84   struct GNUNET_TESTBED_Operation *op;
85
86   /**
87    * Depending on whether we start or stop NSE service at the peer set this to 1
88    * or -1
89    */
90   int delta;
91
92   /**
93    * Index of the regarding peer
94    */
95   unsigned int index;
96 };
97
98 /**
99  * OpList DLL head
100  */
101 static struct OpListEntry *oplist_head;
102
103 /**
104  * OpList DLL tail
105  */
106 static struct OpListEntry *oplist_tail;
107
108
109 /**
110  * A pending reply: A request was sent and the reply is pending.
111  */
112 struct PendingReply
113 {
114   /**
115    * DLL next,prev ptr
116    */
117   struct PendingReply *next;
118   struct PendingReply *prev;
119
120   /**
121    * Handle to the request we are waiting for
122    */
123   struct GNUNET_RPS_Request_Handle *req_handle;
124
125   /**
126    * The peer that requested
127    */
128   struct RPSPeer *rps_peer;
129 };
130
131
132 /**
133  * A pending request: A request was not made yet but is scheduled for later.
134  */
135 struct PendingRequest
136 {
137   /**
138    * DLL next,prev ptr
139    */
140   struct PendingRequest *next;
141   struct PendingRequest *prev;
142
143   /**
144    * Handle to the request we are waiting for
145    */
146   struct GNUNET_SCHEDULER_Task *request_task;
147
148   /**
149    * The peer that requested
150    */
151   struct RPSPeer *rps_peer;
152 };
153
154
155 /**
156  * Information we track for each peer.
157  */
158 struct RPSPeer
159 {
160   /**
161    * Index of the peer.
162    */
163   unsigned int index;
164
165   /**
166    * Handle for RPS connect operation.
167    */
168   struct GNUNET_TESTBED_Operation *op;
169
170   /**
171    * Handle to RPS service.
172    */
173   struct GNUNET_RPS_Handle *rps_handle;
174
175   /**
176    * ID of the peer.
177    */
178   struct GNUNET_PeerIdentity *peer_id;
179
180   /**
181    * A request handle to check for an request
182    */
183   //struct GNUNET_RPS_Request_Handle *req_handle;
184
185   /**
186    * Peer on- or offline?
187    */
188   int online;
189
190   /**
191    * Number of Peer IDs to request
192    */
193   unsigned int num_ids_to_request;
194
195   /**
196    * Pending requests DLL
197    */
198   struct PendingRequest *pending_req_head;
199   struct PendingRequest *pending_req_tail;
200
201   /**
202    * Number of pending requests
203    */
204   unsigned int num_pending_reqs;
205
206   /**
207    * Pending replies DLL
208    */
209   struct PendingReply *pending_rep_head;
210   struct PendingReply *pending_rep_tail;
211
212   /**
213    * Number of pending replies
214    */
215   unsigned int num_pending_reps;
216
217   /**
218    * Received PeerIDs
219    */
220   struct GNUNET_PeerIdentity *rec_ids;
221
222   /**
223    * Number of received PeerIDs
224    */
225   unsigned int num_rec_ids;
226 };
227
228
229 /**
230  * Information for all the peers.
231  */
232 static struct RPSPeer *rps_peers;
233
234 /**
235  * Peermap to get the index of a given peer ID quick.
236  */
237 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
238
239 /**
240  * IDs of the peers.
241  */
242 static struct GNUNET_PeerIdentity *rps_peer_ids;
243
244 /**
245  * ID of the targeted peer.
246  */
247 static struct GNUNET_PeerIdentity *target_peer;
248
249 /**
250  * ID of the peer that requests for the evaluation.
251  */
252 static struct RPSPeer *eval_peer;
253
254 /**
255  * Number of online peers.
256  */
257 static unsigned int num_peers_online;
258
259 /**
260  * Return value from 'main'.
261  */
262 static int ok;
263
264
265 /**
266  * Identifier for the churn task that runs periodically
267  */
268 static struct GNUNET_SCHEDULER_Task *churn_task;
269
270 /**
271  * Identifier for the churn task that runs periodically
272  */
273 static struct GNUNET_SCHEDULER_Task *shutdown_task;
274
275
276 /**
277  * Called to initialise the given RPSPeer
278  */
279 typedef void (*InitPeer) (struct RPSPeer *rps_peer);
280
281 /**
282  * Called directly after connecting to the service
283  */
284 typedef void (*PreTest) (void *cls, struct GNUNET_RPS_Handle *h);
285
286 /**
287  * Called from within #rps_connect_complete_cb ()
288  * Executes functions to test the api/service
289  */
290 typedef void (*MainTest) (struct RPSPeer *rps_peer);
291
292 /**
293  * Callback called once the requested random peers are available
294  */
295 typedef void (*ReplyHandle) (void *cls,
296                              uint64_t n,
297                              const struct GNUNET_PeerIdentity *recv_peers);
298
299 /**
300  * Called directly before disconnecting from the service
301  */
302 typedef void (*PostTest) (void *cls, struct GNUNET_RPS_Handle *h);
303
304 /**
305  * Function called after disconnect to evaluate test success
306  */
307 typedef int (*EvaluationCallback) (void);
308
309
310 /**
311  * Structure to define a single test
312  */
313 struct SingleTestRun
314 {
315   /**
316    * Name of the test
317    */
318   char *name;
319
320   /**
321    * Called to initialise peer
322    */
323   InitPeer init_peer;
324
325   /**
326    * Called directly after connecting to the service
327    */
328   PreTest pre_test;
329
330   /**
331    * Function to execute the functions to be tested
332    */
333   MainTest main_test;
334
335   /**
336    * Callback called once the requested peers are available
337    */
338   ReplyHandle reply_handle;
339
340   /**
341    * Called directly before disconnecting from the service
342    */
343   PostTest post_test;
344
345   /**
346    * Function to evaluate the test results
347    */
348   EvaluationCallback eval_cb;
349
350   /**
351    * Request interval
352    */
353   uint32_t request_interval;
354
355   /**
356    * Number of Requests to make.
357    */
358   uint32_t num_requests;
359 } cur_test_run;
360
361 /**
362  * Are we shutting down?
363  */
364 static int in_shutdown;
365
366 /**
367  * Append arguments to file
368  */
369 static void
370 tofile_ (const char *file_name, char *line)
371 {
372   struct GNUNET_DISK_FileHandle *f;
373   /* char output_buffer[512]; */
374   size_t size;
375   /* int size; */
376   size_t size2;
377
378   if (NULL == (f = GNUNET_DISK_file_open (file_name,
379                                           GNUNET_DISK_OPEN_APPEND |
380                                           GNUNET_DISK_OPEN_WRITE |
381                                           GNUNET_DISK_OPEN_CREATE,
382                                           GNUNET_DISK_PERM_USER_READ |
383                                           GNUNET_DISK_PERM_USER_WRITE |
384                                           GNUNET_DISK_PERM_GROUP_READ |
385                                           GNUNET_DISK_PERM_OTHER_READ)))
386   {
387     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
388                 "Not able to open file %s\n",
389                 file_name);
390     return;
391   }
392   /* size = GNUNET_snprintf (output_buffer,
393                           sizeof (output_buffer),
394                           "%llu %s\n",
395                           GNUNET_TIME_absolute_get ().abs_value_us,
396                           line);
397   if (0 > size)
398   {
399     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
400                 "Failed to write string to buffer (size: %i)\n",
401                 size);
402     return;
403   } */
404
405   size = strlen (line) * sizeof (char);
406
407   size2 = GNUNET_DISK_file_write (f, line, size);
408   if (size != size2)
409   {
410     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
411                 "Unable to write to file! (Size: %u, size2: %u)\n",
412                 size,
413                 size2);
414     return;
415   }
416
417   if (GNUNET_YES != GNUNET_DISK_file_close (f))
418     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
419                 "Unable to close file\n");
420 }
421
422 /**
423  * This function is used to facilitate writing important information to disk
424  */
425 #define tofile(file_name, ...) do {\
426   char tmp_buf[512];\
427     int size;\
428     size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
429     if (0 > size)\
430       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
431                      "Failed to create tmp_buf\n");\
432     else\
433       tofile_(file_name,tmp_buf);\
434   } while (0);
435
436
437 /**
438  * Write the ids and their according index in the given array to a file 
439  * Unused
440  */
441 /* static void
442 ids_to_file (char *file_name,
443              struct GNUNET_PeerIdentity *peer_ids,
444              unsigned int num_peer_ids)
445 {
446   unsigned int i;
447
448   for (i=0 ; i < num_peer_ids ; i++)
449   {
450     to_file (file_name,
451              "%u\t%s",
452              i,
453              GNUNET_i2s_full (&peer_ids[i]));
454   }
455 } */
456
457 /**
458  * Test the success of a single test
459  */
460 static int
461 evaluate (void)
462 {
463   unsigned int i;
464   int tmp_ok;
465
466   tmp_ok = 1;
467
468   for (i = 0; i < num_peers; i++)
469   {
470     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471         "%u. peer [%s] received %u of %u expected peer_ids: %i\n",
472         i,
473         GNUNET_i2s (rps_peers[i].peer_id),
474         rps_peers[i].num_rec_ids,
475         rps_peers[i].num_ids_to_request,
476         (rps_peers[i].num_ids_to_request == rps_peers[i].num_rec_ids));
477     tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_rec_ids);
478   }
479   return tmp_ok? 0 : 1;
480 }
481
482
483 /**
484  * Creates an oplist entry and adds it to the oplist DLL
485  */
486 static struct OpListEntry *
487 make_oplist_entry ()
488 {
489   struct OpListEntry *entry;
490
491   entry = GNUNET_new (struct OpListEntry);
492   GNUNET_CONTAINER_DLL_insert_tail (oplist_head, oplist_tail, entry);
493   return entry;
494 }
495
496
497 /**
498  * Task run on timeout to shut everything down.
499  */
500 static void
501 shutdown_op (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
502 {
503   unsigned int i;
504
505   in_shutdown = GNUNET_YES;
506   if (NULL != churn_task)
507     GNUNET_SCHEDULER_cancel (churn_task);
508
509   for (i = 0 ; i < num_peers ; i++)
510     GNUNET_TESTBED_operation_done (rps_peers[i].op);
511   GNUNET_SCHEDULER_shutdown ();
512 }
513
514
515 /**
516  * Seed peers.
517  */
518   void
519 seed_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
520 {
521   unsigned int amount;
522   struct RPSPeer *peer = (struct RPSPeer *) cls;
523   unsigned int i;
524
525   // TODO if malicious don't seed mal peers
526   amount = round (.5 * num_peers);
527
528   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding peers:\n");
529   for (i = 0 ; i < amount ; i++)
530     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
531                 i,
532                 GNUNET_i2s (&rps_peer_ids[i]));
533
534   GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids);
535 }
536
537
538 /**
539  * Get the id of peer i.
540  */
541   void
542 info_cb (void *cb_cls,
543          struct GNUNET_TESTBED_Operation *op,
544          const struct GNUNET_TESTBED_PeerInformation *pinfo,
545          const char *emsg)
546 {
547   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
548
549   if (NULL == pinfo || NULL != emsg)
550   {
551     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
552     return;
553   }
554
555   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
556               "Peer %u is %s\n",
557               entry->index,
558               GNUNET_i2s (pinfo->result.id));
559
560   rps_peer_ids[entry->index] = *(pinfo->result.id);
561   rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
562   rps_peers[entry->index].rec_ids = NULL;
563   rps_peers[entry->index].num_rec_ids = 0;
564
565   GNUNET_CONTAINER_multipeermap_put (peer_map,
566       &rps_peer_ids[entry->index],
567       &rps_peers[entry->index],
568       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
569   tofile ("/tmp/rps/peer_ids",
570            "%u\t%s\n",
571            entry->index,
572            GNUNET_i2s_full (&rps_peer_ids[entry->index]));
573
574   if (NULL != cur_test_run.init_peer)
575     cur_test_run.init_peer (&rps_peers[entry->index]);
576
577   GNUNET_TESTBED_operation_done (entry->op);
578   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
579   GNUNET_free (entry);
580 }
581
582
583 /**
584  * Callback to be called when RPS service connect operation is completed
585  *
586  * @param cls the callback closure from functions generating an operation
587  * @param op the operation that has been finished
588  * @param ca_result the RPS service handle returned from rps_connect_adapter
589  * @param emsg error message in case the operation has failed; will be NULL if
590  *          operation has executed successfully.
591  */
592 static void
593 rps_connect_complete_cb (void *cls,
594                          struct GNUNET_TESTBED_Operation *op,
595                          void *ca_result,
596                          const char *emsg)
597 {
598   struct RPSPeer *rps_peer = cls;
599   struct GNUNET_RPS_Handle *rps = ca_result;
600
601   rps_peer->rps_handle = rps;
602   rps_peer->online = GNUNET_YES;
603   num_peers_online++;
604
605   GNUNET_assert (op == rps_peer->op);
606   if (NULL != emsg)
607   {
608     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
609                 "Failed to connect to RPS service: %s\n",
610                 emsg);
611     ok = 1;
612     GNUNET_SCHEDULER_shutdown ();
613     return;
614   }
615
616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Started client successfully\n");
617
618   cur_test_run.main_test (rps_peer);
619 }
620
621
622 /**
623  * Adapter function called to establish a connection to
624  * the RPS service.
625  *
626  * @param cls closure
627  * @param cfg configuration of the peer to connect to; will be available until
628  *          GNUNET_TESTBED_operation_done() is called on the operation returned
629  *          from GNUNET_TESTBED_service_connect()
630  * @return service handle to return in 'op_result', NULL on error
631  */
632 static void *
633 rps_connect_adapter (void *cls,
634                                  const struct GNUNET_CONFIGURATION_Handle *cfg)
635 {
636   struct GNUNET_RPS_Handle *h;
637
638   h = GNUNET_RPS_connect (cfg);
639
640   if (NULL != cur_test_run.pre_test)
641     cur_test_run.pre_test (cls, h);
642
643   return h;
644 }
645
646
647 /**
648  * Adapter function called to destroy connection to
649  * RPS service.
650  *
651  * @param cls closure
652  * @param op_result service handle returned from the connect adapter
653  */
654 static void
655 rps_disconnect_adapter (void *cls,
656                                           void *op_result)
657 {
658   struct GNUNET_RPS_Handle *h = op_result;
659   GNUNET_RPS_disconnect (h);
660 }
661
662
663 /***********************************************************************
664  * Definition of tests
665 ***********************************************************************/
666
667 // TODO check whether tests can be stopped earlier
668 static int
669 default_eval_cb (void)
670 {
671   return evaluate ();
672 }
673
674 static int
675 no_eval (void)
676 {
677   return 0;
678 }
679
680 /**
681  * Initialise given RPSPeer
682  */
683 static void default_init_peer (struct RPSPeer *rps_peer)
684 {
685   rps_peer->num_ids_to_request = 1;
686 }
687
688 /**
689  * Callback to call on receipt of a reply
690  *
691  * @param cls closure
692  * @param n number of peers
693  * @param recv_peers the received peers
694  */
695 static void
696 default_reply_handle (void *cls,
697                       uint64_t n,
698                       const struct GNUNET_PeerIdentity *recv_peers)
699 {
700   struct RPSPeer *rps_peer;
701   struct PendingReply *pending_rep = (struct PendingReply *) cls;
702   unsigned int i;
703
704   rps_peer = pending_rep->rps_peer;
705   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
706                                rps_peer->pending_rep_tail,
707                                pending_rep);
708   rps_peer->num_pending_reps--;
709   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710               "[%s] got %" PRIu64 " peers:\n",
711               GNUNET_i2s (rps_peer->peer_id),
712               n);
713   
714   for (i = 0; i < n; i++)
715   {
716     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
717                 "%u: %s\n",
718                 i,
719                 GNUNET_i2s (&recv_peers[i]));
720
721     GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, recv_peers[i]);
722   }
723
724   if (0 == evaluate ())
725   {
726     GNUNET_SCHEDULER_cancel (shutdown_task);
727     shutdown_task = GNUNET_SCHEDULER_add_now (&shutdown_op, NULL);
728   }
729 }
730
731 /**
732  * Request random peers.
733  */
734 static void
735 request_peers (void *cls,
736                const struct GNUNET_SCHEDULER_TaskContext *tc)
737 {
738   struct RPSPeer *rps_peer;
739   struct PendingRequest *pending_req = (struct PendingRequest *) cls;
740   struct PendingReply *pending_rep;
741
742   if (GNUNET_YES == in_shutdown)
743     return;
744   rps_peer = pending_req->rps_peer;
745   GNUNET_assert (1 <= rps_peer->num_pending_reqs);
746   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
747                                rps_peer->pending_req_tail,
748                                pending_req);
749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750               "Requesting one peer\n");
751   pending_rep = GNUNET_new (struct PendingReply);
752   pending_rep->rps_peer = rps_peer;
753   pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
754       1,
755       cur_test_run.reply_handle,
756       pending_rep);
757   GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
758                                     rps_peer->pending_rep_tail,
759                                     pending_rep);
760   rps_peer->num_pending_reps++;
761   rps_peer->num_pending_reqs--;
762 }
763
764 static void
765 cancel_pending_req (struct PendingRequest *pending_req)
766 {
767   struct RPSPeer *rps_peer;
768
769   rps_peer = pending_req->rps_peer;
770   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
771                                rps_peer->pending_req_tail,
772                                pending_req);
773   rps_peer->num_pending_reqs--;
774   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775               "Cancelling pending request\n");
776   GNUNET_SCHEDULER_cancel (pending_req->request_task);
777   GNUNET_free (pending_req);
778 }
779
780 static void
781 cancel_request (struct PendingReply *pending_rep)
782 {
783   struct RPSPeer *rps_peer;
784
785   rps_peer = pending_rep->rps_peer;
786   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
787                                rps_peer->pending_rep_tail,
788                                pending_rep);
789   rps_peer->num_pending_reps--;
790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791               "Cancelling request\n");
792   GNUNET_RPS_request_cancel (pending_rep->req_handle);
793   GNUNET_free (pending_rep);
794 }
795
796 /**
797  * Cancel a request.
798  */
799 static void
800 cancel_request_cb (void *cls,
801                 const struct GNUNET_SCHEDULER_TaskContext *tc)
802 {
803   struct PendingReply *pending_rep;
804   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
805
806   if (GNUNET_YES == in_shutdown)
807     return;
808   pending_rep = rps_peer->pending_rep_head;
809   GNUNET_assert (1 <= rps_peer->num_pending_reps);
810   cancel_request (pending_rep);
811 }
812
813
814 /**
815  * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
816  * issued, nor replied
817  */
818 void
819 schedule_missing_requests (struct RPSPeer *rps_peer)
820 {
821   unsigned int i;
822   struct PendingRequest *pending_req;
823
824   for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
825        i < rps_peer->num_ids_to_request; i++)
826   {
827     pending_req = GNUNET_new (struct PendingRequest);
828     pending_req->rps_peer = rps_peer;
829     pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
830         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
831           cur_test_run.request_interval * i),
832         request_peers,
833         pending_req);
834     GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
835                                       rps_peer->pending_req_tail,
836                                       pending_req);
837     rps_peer->num_pending_reqs++;
838   }
839 }
840
841 void
842 cancel_pending_req_rep (struct RPSPeer *rps_peer)
843 {
844   while (NULL != rps_peer->pending_req_head)
845     cancel_pending_req (rps_peer->pending_req_head);
846   GNUNET_assert (0 == rps_peer->num_pending_reqs);
847   while (NULL != rps_peer->pending_rep_head)
848     cancel_request (rps_peer->pending_rep_head);
849   GNUNET_assert (0 == rps_peer->num_pending_reps);
850 }
851
852 /***********************************
853  * MALICIOUS
854 ***********************************/
855
856 /**
857  * Initialise only non-mal RPSPeers
858  */
859 static void mal_init_peer (struct RPSPeer *rps_peer)
860 {
861   if (rps_peer->index >= round (portion * num_peers))
862     rps_peer->num_ids_to_request = 1;
863 }
864
865 static void
866 mal_pre (void *cls, struct GNUNET_RPS_Handle *h)
867 {
868   #ifdef ENABLE_MALICIOUS
869   uint32_t num_mal_peers;
870   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
871
872   GNUNET_assert ( (1 >= portion) &&
873                   (0 <  portion) );
874   num_mal_peers = round (portion * num_peers);
875
876   if (rps_peer->index < num_mal_peers)
877   {
878     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
879                 "%u. peer [%s] of %" PRIu32 " malicious peers turning malicious\n",
880                 rps_peer->index,
881                 GNUNET_i2s (rps_peer->peer_id),
882                 num_mal_peers);
883
884     GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
885                               rps_peer_ids, target_peer);
886   }
887   #endif /* ENABLE_MALICIOUS */
888 }
889
890 static void
891 mal_cb (struct RPSPeer *rps_peer)
892 {
893   uint32_t num_mal_peers;
894
895   #ifdef ENABLE_MALICIOUS
896   GNUNET_assert ( (1 >= portion) &&
897                   (0 <  portion) );
898   num_mal_peers = round (portion * num_peers);
899
900   if (rps_peer->index >= num_mal_peers)
901   { /* It's useless to ask a malicious peer about a random sample -
902        it's not sampling */
903     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
904                                   seed_peers, rps_peer);
905     schedule_missing_requests (rps_peer);
906   }
907   #endif /* ENABLE_MALICIOUS */
908 }
909
910
911 /***********************************
912  * SINGLE_REQUEST
913 ***********************************/
914 static void
915 single_req_cb (struct RPSPeer *rps_peer)
916 {
917   schedule_missing_requests (rps_peer);
918 }
919
920 /***********************************
921  * DELAYED_REQUESTS
922 ***********************************/
923 static void
924 delay_req_cb (struct RPSPeer *rps_peer)
925 {
926   schedule_missing_requests (rps_peer);
927 }
928
929 /***********************************
930  * SEED
931 ***********************************/
932 static void
933 seed_cb (struct RPSPeer *rps_peer)
934 {
935   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
936                                 seed_peers, rps_peer);
937 }
938
939 /***********************************
940  * SEED_BIG
941 ***********************************/
942 static void
943 seed_big_cb (struct RPSPeer *rps_peer)
944 {
945   // TODO test seeding > GNUNET_SERVER_MAX_MESSAGE_SIZE peers
946 }
947
948 /***********************************
949  * SINGLE_PEER_SEED
950 ***********************************/
951 static void
952 single_peer_seed_cb (struct RPSPeer *rps_peer)
953 {
954   // TODO
955 }
956
957 /***********************************
958  * SEED_REQUEST
959 ***********************************/
960 static void
961 seed_req_cb (struct RPSPeer *rps_peer)
962 {
963   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
964                                 seed_peers, rps_peer);
965   schedule_missing_requests (rps_peer);
966 }
967
968 //TODO start big mal
969
970 /***********************************
971  * REQUEST_CANCEL
972 ***********************************/
973 static void
974 req_cancel_cb (struct RPSPeer *rps_peer)
975 {
976   schedule_missing_requests (rps_peer);
977   GNUNET_SCHEDULER_add_delayed (
978       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
979                                      (cur_test_run.request_interval + 1)),
980       cancel_request_cb, rps_peer);
981 }
982
983 /***********************************
984  * PROFILER
985 ***********************************/
986
987 /**
988  * Callback to be called when RPS service is started or stopped at peers
989  *
990  * @param cls NULL
991  * @param op the operation handle
992  * @param emsg NULL on success; otherwise an error description
993  */
994 static void
995 churn_cb (void *cls,
996           struct GNUNET_TESTBED_Operation *op,
997           const char *emsg)
998 {
999   // FIXME
1000   struct OpListEntry *entry = cls;
1001
1002   GNUNET_TESTBED_operation_done (entry->op);
1003   if (NULL != emsg)
1004   {
1005     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
1006     GNUNET_SCHEDULER_shutdown ();
1007     return;
1008   }
1009   GNUNET_assert (0 != entry->delta);
1010
1011   num_peers_online += entry->delta;
1012
1013   if (0 > entry->delta)
1014   { /* Peer hopefully just went offline */
1015     if (GNUNET_YES != rps_peers[entry->index].online)
1016     {
1017       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1018                   "peer %s was expected to go offline but is still marked as online\n",
1019                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1020       GNUNET_break (0);
1021     }
1022     else
1023     {
1024       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025                   "peer %s probably went offline as expected\n",
1026                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1027     }
1028     rps_peers[entry->index].online = GNUNET_NO;
1029   }
1030
1031   else if (0 < entry->delta)
1032   { /* Peer hopefully just went online */
1033     if (GNUNET_NO != rps_peers[entry->index].online)
1034     {
1035       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1036                   "peer %s was expected to go online but is still marked as offline\n",
1037                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1038       GNUNET_break (0);
1039     }
1040     else
1041     {
1042       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043                   "peer %s probably went online as expected\n",
1044                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1045       if (NULL != cur_test_run.pre_test)
1046       {
1047         cur_test_run.pre_test (&rps_peers[entry->index],
1048             rps_peers[entry->index].rps_handle);
1049         schedule_missing_requests (&rps_peers[entry->index]);
1050       }
1051     }
1052     rps_peers[entry->index].online = GNUNET_YES;
1053   }
1054
1055   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
1056   GNUNET_free (entry);
1057   //if (num_peers_in_round[current_round] == peers_running)
1058   //  run_round ();
1059 }
1060
1061 static void
1062 manage_service_wrapper (unsigned int i, unsigned int j, int delta,
1063     double prob_go_on_off)
1064 {
1065   struct OpListEntry *entry;
1066   uint32_t prob;
1067
1068   prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1069                                    UINT32_MAX);
1070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071               "%u. selected peer (%u: %s) is %s.\n",
1072               i,
1073               j,
1074               GNUNET_i2s (rps_peers[j].peer_id),
1075               (delta < 0)? "online" : "offline");
1076   if (prob < prob_go_on_off * UINT32_MAX)
1077   {
1078     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079                 "%s goes %s\n",
1080                 GNUNET_i2s (rps_peers[j].peer_id),
1081                 (delta < 0) ? "offline" : "online");
1082
1083     entry = make_oplist_entry ();
1084     entry->delta = delta;
1085     entry->index = j;
1086     entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
1087                                                     testbed_peers[j],
1088                                                     "rps",
1089                                                     &churn_cb,
1090                                                     entry,
1091                                                     (delta < 0) ? 0 : 1);
1092   }
1093 }
1094
1095 static void
1096 churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1097 {
1098   unsigned int i;
1099   unsigned int j;
1100   double portion_online;
1101   unsigned int *permut;
1102   double prob_go_offline;
1103   double portion_go_online;
1104   double portion_go_offline;
1105
1106   /* Compute the probability for an online peer to go offline
1107    * this round */
1108   portion_online = num_peers_online * 1.0 / num_peers;
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Portion online: %f\n",
1111               portion_online);
1112   portion_go_online = ((1 - portion_online) * .5 * .66);
1113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114               "Portion that should go online: %f\n",
1115               portion_go_online);
1116   portion_go_offline = (portion_online + portion_go_online) - .75;
1117   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118               "Portion that probably goes offline: %f\n",
1119               portion_go_offline);
1120   prob_go_offline = portion_go_offline / (portion_online * .5);
1121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122               "Probability of a selected online peer to go offline: %f\n",
1123               prob_go_offline);
1124
1125   permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK,
1126                                          (unsigned int) num_peers);
1127
1128   /* Go over 50% randomly chosen peers */
1129   for (i = 0; i < .5 * num_peers; i++)
1130   {
1131     j = permut[i];
1132
1133     /* If online, shut down with certain probability */
1134     if (GNUNET_YES == rps_peers[j].online)
1135     {
1136       cancel_pending_req_rep (&rps_peers[j]);
1137       manage_service_wrapper (i, j, -1, prob_go_offline);
1138     }
1139
1140     /* If offline, restart with certain probability */
1141     else if (GNUNET_NO == rps_peers[j].online)
1142     {
1143       manage_service_wrapper (i, j, 1, 0.66);
1144     }
1145   }
1146
1147   GNUNET_free (permut);
1148
1149   churn_task = GNUNET_SCHEDULER_add_delayed (
1150         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1151         churn,
1152         NULL);
1153 }
1154
1155
1156 /**
1157  * Initialise given RPSPeer
1158  */
1159 static void profiler_init_peer (struct RPSPeer *rps_peer)
1160 {
1161   if (num_peers - 1 == rps_peer->index)
1162     rps_peer->num_ids_to_request = cur_test_run.num_requests;
1163 }
1164
1165
1166 /**
1167  * Callback to call on receipt of a reply
1168  *
1169  * @param cls closure
1170  * @param n number of peers
1171  * @param recv_peers the received peers
1172  */
1173 static void
1174 profiler_reply_handle (void *cls,
1175                       uint64_t n,
1176                       const struct GNUNET_PeerIdentity *recv_peers)
1177 {
1178   struct RPSPeer *rps_peer;
1179   struct RPSPeer *rcv_rps_peer;
1180   char *file_name;
1181   char *file_name_dh;
1182   unsigned int i;
1183   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1184
1185   rps_peer = pending_rep->rps_peer;
1186   file_name = "/tmp/rps/received_ids";
1187   file_name_dh = "/tmp/rps/diehard_input";
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "[%s] got %" PRIu64 " peers:\n",
1190               GNUNET_i2s (rps_peer->peer_id),
1191               n);
1192   for (i = 0; i < n; i++)
1193   {
1194     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195                 "%u: %s\n",
1196                 i,
1197                 GNUNET_i2s (&recv_peers[i]));
1198     tofile (file_name,
1199              "%s\n",
1200              GNUNET_i2s_full (&recv_peers[i]));
1201     rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
1202     tofile (file_name_dh,
1203              "%" PRIu32 "\n",
1204              (uint32_t) rcv_rps_peer->index);
1205   }
1206   /* Find #PendingReply holding the request handle */
1207   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1208                                rps_peer->pending_rep_tail,
1209                                pending_rep);
1210   rps_peer->num_pending_reps--;
1211 }
1212
1213
1214 static void
1215 profiler_cb (struct RPSPeer *rps_peer)
1216 {
1217   /* Start churn */
1218   if (NULL == churn_task)
1219   {
1220     churn_task = GNUNET_SCHEDULER_add_delayed (
1221           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1222           churn,
1223           NULL);
1224   }
1225
1226   /* Only request peer ids at one peer.
1227    * (It's the before-last because last one is target of the focussed attack.)
1228    */
1229   if (eval_peer == rps_peer)
1230     schedule_missing_requests (rps_peer);
1231 }
1232
1233 /**
1234  * Function called from #profiler_eval with a filename.
1235  *
1236  * @param cls closure
1237  * @param filename complete filename (absolute path)
1238  * @return #GNUNET_OK to continue to iterate,
1239  *  #GNUNET_NO to stop iteration with no error,
1240  *  #GNUNET_SYSERR to abort iteration with error!
1241  */
1242 int
1243 file_name_cb (void *cls, const char *filename)
1244 {
1245   if (NULL != strstr (filename, "sampler_el"))
1246   {
1247     struct RPS_SamplerElement *s_elem;
1248     struct GNUNET_CRYPTO_AuthKey auth_key;
1249     const char *key_char;
1250     uint32_t i;
1251
1252     key_char = filename + 20; /* Length of "/tmp/rps/sampler_el-" */
1253     tofile (filename, "--------------------------\n");
1254
1255     auth_key = string_to_auth_key (key_char);
1256     s_elem = RPS_sampler_elem_create ();
1257     RPS_sampler_elem_set (s_elem, auth_key);
1258
1259     for (i = 0; i < num_peers; i++)
1260     {
1261       RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
1262     }
1263   }
1264   return GNUNET_OK;
1265 }
1266
1267 /**
1268  * This is run after the test finished.
1269  *
1270  * Compute all perfect samples.
1271  */
1272 int
1273 profiler_eval (void)
1274 {
1275   /* Compute perfect sample for each sampler element */
1276   if (-1 == GNUNET_DISK_directory_scan ("/tmp/rps/", file_name_cb, NULL))
1277   {
1278     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n");
1279   }
1280
1281   return evaluate ();
1282 }
1283
1284
1285 /***********************************************************************
1286  * /Definition of tests
1287 ***********************************************************************/
1288
1289
1290 /**
1291  * Actual "main" function for the testcase.
1292  *
1293  * @param cls closure
1294  * @param h the run handle
1295  * @param n_peers number of peers in 'peers'
1296  * @param peers handle to peers run in the testbed
1297  * @param links_succeeded the number of overlay link connection attempts that
1298  *          succeeded
1299  * @param links_failed the number of overlay link connection attempts that
1300  *          failed
1301  */
1302 static void
1303 run (void *cls,
1304      struct GNUNET_TESTBED_RunHandle *h,
1305      unsigned int n_peers,
1306      struct GNUNET_TESTBED_Peer **peers,
1307      unsigned int links_succeeded,
1308      unsigned int links_failed)
1309 {
1310   unsigned int i;
1311   struct OpListEntry *entry;
1312
1313   testbed_peers = peers;
1314   num_peers_online = 0;
1315   for (i = 0 ; i < num_peers ; i++)
1316   {
1317     entry = make_oplist_entry ();
1318     entry->index = i;
1319     entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
1320                                                      GNUNET_TESTBED_PIT_IDENTITY,
1321                                                      &info_cb,
1322                                                      entry);
1323   }
1324
1325   GNUNET_assert (num_peers == n_peers);
1326   for (i = 0 ; i < n_peers ; i++)
1327   {
1328     rps_peers[i].index = i;
1329     rps_peers[i].op =
1330       GNUNET_TESTBED_service_connect (&rps_peers[i],
1331                                       peers[i],
1332                                       "rps",
1333                                       &rps_connect_complete_cb,
1334                                       &rps_peers[i],
1335                                       &rps_connect_adapter,
1336                                       &rps_disconnect_adapter,
1337                                       &rps_peers[i]);
1338   }
1339
1340   if (NULL != churn_task)
1341     GNUNET_SCHEDULER_cancel (churn_task);
1342   shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
1343 }
1344
1345
1346 /**
1347  * Entry point for the testcase, sets up the testbed.
1348  *
1349  * @param argc unused
1350  * @param argv unused
1351  * @return 0 on success
1352  */
1353 int
1354 main (int argc, char *argv[])
1355 {
1356   int ret_value;
1357
1358   num_peers = 5;
1359   cur_test_run.name = "test-rps-default";
1360   cur_test_run.init_peer = default_init_peer;
1361   cur_test_run.pre_test = NULL;
1362   cur_test_run.reply_handle = default_reply_handle;
1363   cur_test_run.eval_cb = default_eval_cb;
1364   churn_task = NULL;
1365   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
1366
1367   if (strstr (argv[0], "malicious") != NULL)
1368   {
1369     cur_test_run.pre_test = mal_pre;
1370     cur_test_run.main_test = mal_cb;
1371     cur_test_run.init_peer = mal_init_peer;
1372
1373     if (strstr (argv[0], "_1") != NULL)
1374     {
1375       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 1\n");
1376       cur_test_run.name = "test-rps-malicious_1";
1377       mal_type = 1;
1378     }
1379     else if (strstr (argv[0], "_2") != NULL)
1380     {
1381       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 2\n");
1382       cur_test_run.name = "test-rps-malicious_2";
1383       mal_type = 2;
1384     }
1385     else if (strstr (argv[0], "_3") != NULL)
1386     {
1387       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 3\n");
1388       cur_test_run.name = "test-rps-malicious_3";
1389       mal_type = 3;
1390     }
1391   }
1392
1393   else if (strstr (argv[0], "_single_req") != NULL)
1394   {
1395     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test single request\n");
1396     cur_test_run.name = "test-rps-single-req";
1397     cur_test_run.main_test = single_req_cb;
1398   }
1399
1400   else if (strstr (argv[0], "_delayed_reqs") != NULL)
1401   {
1402     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test delayed requests\n");
1403     cur_test_run.name = "test-rps-delayed-reqs";
1404     cur_test_run.main_test = delay_req_cb;
1405   }
1406
1407   else if (strstr (argv[0], "_seed_big") != NULL)
1408   {
1409     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding (num_peers > GNUNET_SERVER_MAX_MESSAGE_SIZE)\n");
1410     cur_test_run.name = "test-rps-seed-big";
1411     cur_test_run.main_test = seed_big_cb;
1412   }
1413
1414   else if (strstr (argv[0], "_single_peer_seed") != NULL)
1415   {
1416     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on a single peer\n");
1417     cur_test_run.name = "test-rps-single-peer-seed";
1418     cur_test_run.main_test = single_peer_seed_cb;
1419   }
1420
1421   else if (strstr (argv[0], "_seed_request") != NULL)
1422   {
1423     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on multiple peers\n");
1424     cur_test_run.name = "test-rps-seed-request";
1425     cur_test_run.main_test = seed_req_cb;
1426   }
1427
1428   else if (strstr (argv[0], "_seed") != NULL)
1429   {
1430     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding\n");
1431     cur_test_run.name = "test-rps-seed";
1432     cur_test_run.main_test = seed_cb;
1433     cur_test_run.eval_cb = no_eval;
1434   }
1435
1436   else if (strstr (argv[0], "_req_cancel") != NULL)
1437   {
1438     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
1439     cur_test_run.name = "test-rps-req-cancel";
1440     num_peers = 1;
1441     cur_test_run.main_test = req_cancel_cb;
1442     cur_test_run.eval_cb = no_eval;
1443     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
1444   }
1445
1446   else if (strstr (argv[0], "profiler") != NULL)
1447   {
1448     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
1449     cur_test_run.name = "test-rps-profiler";
1450     num_peers = 10;
1451     mal_type = 3;
1452     cur_test_run.init_peer = profiler_init_peer;
1453     cur_test_run.pre_test = mal_pre;
1454     cur_test_run.main_test = profiler_cb;
1455     cur_test_run.reply_handle = profiler_reply_handle;
1456     cur_test_run.eval_cb = profiler_eval;
1457     cur_test_run.request_interval = 2;
1458     cur_test_run.num_requests = 5;
1459     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90);
1460
1461     /* 'Clean' directory */
1462     (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
1463     GNUNET_DISK_directory_create ("/tmp/rps/");
1464   }
1465
1466   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
1467   peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
1468   rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1469   if ( (2 == mal_type) ||
1470        (3 == mal_type))
1471     target_peer = &rps_peer_ids[num_peers - 2];
1472   if (profiler_eval == cur_test_run.eval_cb)
1473     eval_peer = &rps_peers[num_peers - 1];
1474
1475   ok = 1;
1476   (void) GNUNET_TESTBED_test_run (cur_test_run.name,
1477                                   "test_rps.conf",
1478                                   num_peers,
1479                                   0, NULL, NULL,
1480                                   &run, NULL);
1481
1482   ret_value = cur_test_run.eval_cb();
1483   GNUNET_free (rps_peers );
1484   GNUNET_free (rps_peer_ids);
1485   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1486   return ret_value;
1487 }
1488
1489 /* end of test_rps_multipeer.c */