4a4a9ee1ae2305d0e6e378528e00434fc8441d0e
[oweals/gnunet.git] / src / rps / test_rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file rps/test_rps_multipeer.c
22  * @brief Testcase for the random peer sampling service.  Starts
23  *        a peergroup with a given number of peers, then waits to
24  *        receive size pushes/pulls from each peer.  Expects to wait
25  *        for one message from each peer.
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_testbed_service.h"
30
31 #include "gnunet_rps_service.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler_elem.h"
34
35 #include <inttypes.h>
36
37
38 /**
39  * How many peers do we start?
40  */
41 uint32_t num_peers;
42
43 /**
44  * How long do we run the test?
45  */
46 //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
47 static struct GNUNET_TIME_Relative timeout;
48
49
50 /**
51  * Portion of malicious peers
52  */
53 static double portion = .1;
54
55 /**
56  * Type of malicious peer to test
57  */
58 static unsigned int mal_type = 0;
59
60 /**
61  * Handles to all of the running peers
62  */
63 static struct GNUNET_TESTBED_Peer **testbed_peers;
64
65
66 /**
67  * Operation map entry
68  */
69 struct OpListEntry
70 {
71   /**
72    * DLL next ptr
73    */
74   struct OpListEntry *next;
75
76   /**
77    * DLL prev ptr
78    */
79   struct OpListEntry *prev;
80
81   /**
82    * The testbed operation
83    */
84   struct GNUNET_TESTBED_Operation *op;
85
86   /**
87    * Depending on whether we start or stop NSE service at the peer set this to 1
88    * or -1
89    */
90   int delta;
91
92   /**
93    * Index of the regarding peer
94    */
95   unsigned int index;
96 };
97
98 /**
99  * OpList DLL head
100  */
101 static struct OpListEntry *oplist_head;
102
103 /**
104  * OpList DLL tail
105  */
106 static struct OpListEntry *oplist_tail;
107
108
109 /**
110  * A pending reply: A request was sent and the reply is pending.
111  */
112 struct PendingReply
113 {
114   /**
115    * DLL next,prev ptr
116    */
117   struct PendingReply *next;
118   struct PendingReply *prev;
119
120   /**
121    * Handle to the request we are waiting for
122    */
123   struct GNUNET_RPS_Request_Handle *req_handle;
124
125   /**
126    * The peer that requested
127    */
128   struct RPSPeer *rps_peer;
129 };
130
131
132 /**
133  * A pending request: A request was not made yet but is scheduled for later.
134  */
135 struct PendingRequest
136 {
137   /**
138    * DLL next,prev ptr
139    */
140   struct PendingRequest *next;
141   struct PendingRequest *prev;
142
143   /**
144    * Handle to the request we are waiting for
145    */
146   struct GNUNET_SCHEDULER_Task *request_task;
147
148   /**
149    * The peer that requested
150    */
151   struct RPSPeer *rps_peer;
152 };
153
154
155 /**
156  * Information we track for each peer.
157  */
158 struct RPSPeer
159 {
160   /**
161    * Index of the peer.
162    */
163   unsigned int index;
164
165   /**
166    * Handle for RPS connect operation.
167    */
168   struct GNUNET_TESTBED_Operation *op;
169
170   /**
171    * Handle to RPS service.
172    */
173   struct GNUNET_RPS_Handle *rps_handle;
174
175   /**
176    * ID of the peer.
177    */
178   struct GNUNET_PeerIdentity *peer_id;
179
180   /**
181    * A request handle to check for an request
182    */
183   //struct GNUNET_RPS_Request_Handle *req_handle;
184
185   /**
186    * Peer on- or offline?
187    */
188   int online;
189
190   /**
191    * Number of Peer IDs to request
192    */
193   unsigned int num_ids_to_request;
194
195   /**
196    * Pending requests DLL
197    */
198   struct PendingRequest *pending_req_head;
199   struct PendingRequest *pending_req_tail;
200
201   /**
202    * Number of pending requests
203    */
204   unsigned int num_pending_reqs;
205
206   /**
207    * Pending replies DLL
208    */
209   struct PendingReply *pending_rep_head;
210   struct PendingReply *pending_rep_tail;
211
212   /**
213    * Number of pending replies
214    */
215   unsigned int num_pending_reps;
216
217   /**
218    * Received PeerIDs
219    */
220   struct GNUNET_PeerIdentity *rec_ids;
221
222   /**
223    * Number of received PeerIDs
224    */
225   unsigned int num_rec_ids;
226 };
227
228
229 /**
230  * Information for all the peers.
231  */
232 static struct RPSPeer *rps_peers;
233
234 /**
235  * Peermap to get the index of a given peer ID quick.
236  */
237 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
238
239 /**
240  * IDs of the peers.
241  */
242 static struct GNUNET_PeerIdentity *rps_peer_ids;
243
244 /**
245  * ID of the targeted peer.
246  */
247 static struct GNUNET_PeerIdentity *target_peer;
248
249 /**
250  * ID of the peer that requests for the evaluation.
251  */
252 static struct RPSPeer *eval_peer;
253
254 /**
255  * Number of online peers.
256  */
257 static unsigned int num_peers_online;
258
259 /**
260  * Return value from 'main'.
261  */
262 static int ok;
263
264
265 /**
266  * Identifier for the churn task that runs periodically
267  */
268 static struct GNUNET_SCHEDULER_Task *churn_task;
269
270
271 /**
272  * Called to initialise the given RPSPeer
273  */
274 typedef void (*InitPeer) (struct RPSPeer *rps_peer);
275
276 /**
277  * Called directly after connecting to the service
278  */
279 typedef void (*PreTest) (void *cls, struct GNUNET_RPS_Handle *h);
280
281 /**
282  * Called from within #rps_connect_complete_cb ()
283  * Executes functions to test the api/service
284  */
285 typedef void (*MainTest) (struct RPSPeer *rps_peer);
286
287 /**
288  * Callback called once the requested random peers are available
289  */
290 typedef void (*ReplyHandle) (void *cls,
291                              uint64_t n,
292                              const struct GNUNET_PeerIdentity *recv_peers);
293
294 /**
295  * Called directly before disconnecting from the service
296  */
297 typedef void (*PostTest) (void *cls, struct GNUNET_RPS_Handle *h);
298
299 /**
300  * Function called after disconnect to evaluate test success
301  */
302 typedef int (*EvaluationCallback) (void);
303
304
305 /**
306  * Structure to define a single test
307  */
308 struct SingleTestRun
309 {
310   /**
311    * Name of the test
312    */
313   char *name;
314
315   /**
316    * Called to initialise peer
317    */
318   InitPeer init_peer;
319
320   /**
321    * Called directly after connecting to the service
322    */
323   PreTest pre_test;
324
325   /**
326    * Function to execute the functions to be tested
327    */
328   MainTest main_test;
329
330   /**
331    * Callback called once the requested peers are available
332    */
333   ReplyHandle reply_handle;
334
335   /**
336    * Called directly before disconnecting from the service
337    */
338   PostTest post_test;
339
340   /**
341    * Function to evaluate the test results
342    */
343   EvaluationCallback eval_cb;
344
345   /**
346    * Request interval
347    */
348   uint32_t request_interval;
349
350   /**
351    * Number of Requests to make.
352    */
353   uint32_t num_requests;
354 } cur_test_run;
355
356 /**
357  * Are we shutting down?
358  */
359 static int in_shutdown;
360
361 /**
362  * Append arguments to file
363  */
364 static void
365 tofile_ (const char *file_name, char *line)
366 {
367   struct GNUNET_DISK_FileHandle *f;
368   /* char output_buffer[512]; */
369   size_t size;
370   /* int size; */
371   size_t size2;
372
373   if (NULL == (f = GNUNET_DISK_file_open (file_name,
374                                           GNUNET_DISK_OPEN_APPEND |
375                                           GNUNET_DISK_OPEN_WRITE |
376                                           GNUNET_DISK_OPEN_CREATE,
377                                           GNUNET_DISK_PERM_USER_READ |
378                                           GNUNET_DISK_PERM_USER_WRITE |
379                                           GNUNET_DISK_PERM_GROUP_READ |
380                                           GNUNET_DISK_PERM_OTHER_READ)))
381   {
382     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
383                 "Not able to open file %s\n",
384                 file_name);
385     return;
386   }
387   /* size = GNUNET_snprintf (output_buffer,
388                           sizeof (output_buffer),
389                           "%llu %s\n",
390                           GNUNET_TIME_absolute_get ().abs_value_us,
391                           line);
392   if (0 > size)
393   {
394     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395                 "Failed to write string to buffer (size: %i)\n",
396                 size);
397     return;
398   } */
399
400   size = strlen (line) * sizeof (char);
401
402   size2 = GNUNET_DISK_file_write (f, line, size);
403   if (size != size2)
404   {
405     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
406                 "Unable to write to file! (Size: %u, size2: %u)\n",
407                 size,
408                 size2);
409     return;
410   }
411
412   if (GNUNET_YES != GNUNET_DISK_file_close (f))
413     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
414                 "Unable to close file\n");
415 }
416
417 /**
418  * This function is used to facilitate writing important information to disk
419  */
420 #define tofile(file_name, ...) do {\
421   char tmp_buf[512];\
422     int size;\
423     size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
424     if (0 > size)\
425       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
426                      "Failed to create tmp_buf\n");\
427     else\
428       tofile_(file_name,tmp_buf);\
429   } while (0);
430
431
432 /**
433  * Write the ids and their according index in the given array to a file 
434  * Unused
435  */
436 /* static void
437 ids_to_file (char *file_name,
438              struct GNUNET_PeerIdentity *peer_ids,
439              unsigned int num_peer_ids)
440 {
441   unsigned int i;
442
443   for (i=0 ; i < num_peer_ids ; i++)
444   {
445     to_file (file_name,
446              "%u\t%s",
447              i,
448              GNUNET_i2s_full (&peer_ids[i]));
449   }
450 } */
451
452 /**
453  * Test the success of a single test
454  */
455 static int
456 evaluate (struct RPSPeer *loc_rps_peers,
457           unsigned int num_loc_rps_peers,
458           unsigned int expected_recv)
459 {
460   unsigned int i;
461   int tmp_ok;
462
463   tmp_ok = (1 == loc_rps_peers[0].num_rec_ids);
464
465   for (i = 0 ; i < num_loc_rps_peers ; i++)
466   {
467     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
468                 "%u. peer [%s] received %u of %u expected peer_ids: %i\n",
469                 i,
470                 GNUNET_i2s (loc_rps_peers[i].peer_id),
471                 loc_rps_peers[i].num_rec_ids,
472                 expected_recv,
473                 (1 == loc_rps_peers[i].num_rec_ids));
474     tmp_ok &= (1 == loc_rps_peers[i].num_rec_ids);
475   }
476   return tmp_ok? 0 : 1;
477 }
478
479
480 /**
481  * Creates an oplist entry and adds it to the oplist DLL
482  */
483 static struct OpListEntry *
484 make_oplist_entry ()
485 {
486   struct OpListEntry *entry;
487
488   entry = GNUNET_new (struct OpListEntry);
489   GNUNET_CONTAINER_DLL_insert_tail (oplist_head, oplist_tail, entry);
490   return entry;
491 }
492
493
494 /**
495  * Task run on timeout to shut everything down.
496  */
497 static void
498 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
499 {
500   unsigned int i;
501
502   in_shutdown = GNUNET_YES;
503   if (NULL != churn_task)
504     GNUNET_SCHEDULER_cancel (churn_task);
505
506   for (i = 0 ; i < num_peers ; i++)
507     GNUNET_TESTBED_operation_done (rps_peers[i].op);
508   GNUNET_SCHEDULER_shutdown ();
509 }
510
511
512 /**
513  * Seed peers.
514  */
515   void
516 seed_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
517 {
518   unsigned int amount;
519   struct RPSPeer *peer = (struct RPSPeer *) cls;
520   unsigned int i;
521
522   // TODO if malicious don't seed mal peers
523   amount = round (.5 * num_peers);
524
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding peers:\n");
526   for (i = 0 ; i < amount ; i++)
527     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
528                 i,
529                 GNUNET_i2s (&rps_peer_ids[i]));
530
531   GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids);
532 }
533
534
535 /**
536  * Get the id of peer i.
537  */
538   void
539 info_cb (void *cb_cls,
540          struct GNUNET_TESTBED_Operation *op,
541          const struct GNUNET_TESTBED_PeerInformation *pinfo,
542          const char *emsg)
543 {
544   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
545
546   if (NULL == pinfo || NULL != emsg)
547   {
548     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
549     return;
550   }
551
552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553               "Peer %u is %s\n",
554               entry->index,
555               GNUNET_i2s (pinfo->result.id));
556
557   rps_peer_ids[entry->index] = *(pinfo->result.id);
558   rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
559   rps_peers[entry->index].rec_ids = NULL;
560   rps_peers[entry->index].num_rec_ids = 0;
561
562   GNUNET_CONTAINER_multipeermap_put (peer_map,
563       &rps_peer_ids[entry->index],
564       &rps_peers[entry->index],
565       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
566   tofile ("/tmp/rps/peer_ids",
567            "%u\t%s\n",
568            entry->index,
569            GNUNET_i2s_full (&rps_peer_ids[entry->index]));
570
571   if (NULL != cur_test_run.init_peer)
572     cur_test_run.init_peer (&rps_peers[entry->index]);
573
574   GNUNET_TESTBED_operation_done (entry->op);
575   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
576   GNUNET_free (entry);
577 }
578
579
580 /**
581  * Callback to be called when RPS service connect operation is completed
582  *
583  * @param cls the callback closure from functions generating an operation
584  * @param op the operation that has been finished
585  * @param ca_result the RPS service handle returned from rps_connect_adapter
586  * @param emsg error message in case the operation has failed; will be NULL if
587  *          operation has executed successfully.
588  */
589 static void
590 rps_connect_complete_cb (void *cls,
591                          struct GNUNET_TESTBED_Operation *op,
592                          void *ca_result,
593                          const char *emsg)
594 {
595   struct RPSPeer *rps_peer = cls;
596   struct GNUNET_RPS_Handle *rps = ca_result;
597
598   rps_peer->rps_handle = rps;
599   rps_peer->online = GNUNET_YES;
600   num_peers_online++;
601
602   GNUNET_assert (op == rps_peer->op);
603   if (NULL != emsg)
604   {
605     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
606                 "Failed to connect to RPS service: %s\n",
607                 emsg);
608     ok = 1;
609     GNUNET_SCHEDULER_shutdown ();
610     return;
611   }
612
613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Started client successfully\n");
614
615   cur_test_run.main_test (rps_peer);
616 }
617
618
619 /**
620  * Adapter function called to establish a connection to
621  * the RPS service.
622  *
623  * @param cls closure
624  * @param cfg configuration of the peer to connect to; will be available until
625  *          GNUNET_TESTBED_operation_done() is called on the operation returned
626  *          from GNUNET_TESTBED_service_connect()
627  * @return service handle to return in 'op_result', NULL on error
628  */
629 static void *
630 rps_connect_adapter (void *cls,
631                                  const struct GNUNET_CONFIGURATION_Handle *cfg)
632 {
633   struct GNUNET_RPS_Handle *h;
634
635   h = GNUNET_RPS_connect (cfg);
636
637   if (NULL != cur_test_run.pre_test)
638     cur_test_run.pre_test (cls, h);
639
640   return h;
641 }
642
643
644 /**
645  * Adapter function called to destroy connection to
646  * RPS service.
647  *
648  * @param cls closure
649  * @param op_result service handle returned from the connect adapter
650  */
651 static void
652 rps_disconnect_adapter (void *cls,
653                         void *op_result)
654 {
655   struct GNUNET_RPS_Handle *h = op_result;
656   GNUNET_RPS_disconnect (h);
657 }
658
659
660 /***********************************************************************
661  * Definition of tests
662 ***********************************************************************/
663
664 // TODO check whether tests can be stopped earlier
665 static int
666 default_eval_cb (void)
667 {
668   return evaluate (rps_peers, num_peers, 1);
669 }
670
671 static int
672 no_eval (void)
673 {
674   return 0;
675 }
676
677 /**
678  * Initialise given RPSPeer
679  */
680 static void default_init_peer (struct RPSPeer *rps_peer)
681 {
682   rps_peer->num_ids_to_request = 1;
683 }
684
685 /**
686  * Callback to call on receipt of a reply
687  *
688  * @param cls closure
689  * @param n number of peers
690  * @param recv_peers the received peers
691  */
692 static void
693 default_reply_handle (void *cls,
694                       uint64_t n,
695                       const struct GNUNET_PeerIdentity *recv_peers)
696 {
697   struct RPSPeer *rps_peer;
698   struct PendingReply *pending_rep = (struct PendingReply *) cls;
699   unsigned int i;
700
701   rps_peer = pending_rep->rps_peer;
702   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
703                                rps_peer->pending_rep_tail,
704                                pending_rep);
705   rps_peer->num_pending_reps--;
706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707               "[%s] got %" PRIu64 " peers:\n",
708               GNUNET_i2s (rps_peer->peer_id),
709               n);
710   
711   for (i = 0 ; i < n ; i++)
712   {
713     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
714                 "%u: %s\n",
715                 i,
716                 GNUNET_i2s (&recv_peers[i]));
717
718     GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, recv_peers[i]);
719   }
720 }
721
722 /**
723  * Request random peers.
724  */
725 static void
726 request_peers (void *cls,
727                const struct GNUNET_SCHEDULER_TaskContext *tc)
728 {
729   struct RPSPeer *rps_peer;
730   struct PendingRequest *pending_req = (struct PendingRequest *) cls;
731   struct PendingReply *pending_rep;
732
733   if (GNUNET_YES == in_shutdown)
734     return;
735   rps_peer = pending_req->rps_peer;
736   GNUNET_assert (1 <= rps_peer->num_pending_reqs);
737   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
738                                rps_peer->pending_req_tail,
739                                pending_req);
740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741               "Requesting one peer\n");
742   pending_rep = GNUNET_new (struct PendingReply);
743   pending_rep->rps_peer = rps_peer;
744   pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
745       1,
746       cur_test_run.reply_handle,
747       pending_rep);
748   GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
749                                     rps_peer->pending_rep_tail,
750                                     pending_rep);
751   rps_peer->num_pending_reps++;
752   rps_peer->num_pending_reqs--;
753 }
754
755 static void
756 cancel_pending_req (struct PendingRequest *pending_req)
757 {
758   struct RPSPeer *rps_peer;
759
760   rps_peer = pending_req->rps_peer;
761   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
762                                rps_peer->pending_req_tail,
763                                pending_req);
764   rps_peer->num_pending_reqs--;
765   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766               "Cancelling pending request\n");
767   GNUNET_SCHEDULER_cancel (pending_req->request_task);
768   GNUNET_free (pending_req);
769 }
770
771 static void
772 cancel_request (struct PendingReply *pending_rep)
773 {
774   struct RPSPeer *rps_peer;
775
776   rps_peer = pending_rep->rps_peer;
777   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
778                                rps_peer->pending_rep_tail,
779                                pending_rep);
780   rps_peer->num_pending_reps--;
781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782               "Cancelling request\n");
783   GNUNET_RPS_request_cancel (pending_rep->req_handle);
784   GNUNET_free (pending_rep);
785 }
786
787 /**
788  * Cancel a request.
789  */
790 static void
791 cancel_request_cb (void *cls,
792                 const struct GNUNET_SCHEDULER_TaskContext *tc)
793 {
794   struct PendingReply *pending_rep;
795   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
796
797   if (GNUNET_YES == in_shutdown)
798     return;
799   pending_rep = rps_peer->pending_rep_head;
800   GNUNET_assert (1 <= rps_peer->num_pending_reps);
801   cancel_request (pending_rep);
802 }
803
804
805 /**
806  * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
807  * issued, nor replied
808  */
809 void
810 schedule_missing_requests (struct RPSPeer *rps_peer)
811 {
812   unsigned int i;
813   struct PendingRequest *pending_req;
814
815   for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
816        i < rps_peer->num_ids_to_request; i++)
817   {
818     pending_req = GNUNET_new (struct PendingRequest);
819     pending_req->rps_peer = rps_peer;
820     pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
821         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
822           cur_test_run.request_interval * i),
823         request_peers,
824         pending_req);
825     GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
826                                       rps_peer->pending_req_tail,
827                                       pending_req);
828     rps_peer->num_pending_reqs++;
829   }
830 }
831
832 void
833 cancel_pending_req_rep (struct RPSPeer *rps_peer)
834 {
835   while (NULL != rps_peer->pending_req_head)
836     cancel_pending_req (rps_peer->pending_req_head);
837   GNUNET_assert (0 == rps_peer->num_pending_reqs);
838   while (NULL != rps_peer->pending_rep_head)
839     cancel_request (rps_peer->pending_rep_head);
840   GNUNET_assert (0 == rps_peer->num_pending_reps);
841 }
842
843 /***********************************
844  * MALICIOUS
845 ***********************************/
846 static void
847 mal_pre (void *cls, struct GNUNET_RPS_Handle *h)
848 {
849   #ifdef ENABLE_MALICIOUS
850   uint32_t num_mal_peers;
851   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
852
853   GNUNET_assert ( (1 >= portion) &&
854                   (0 <  portion) );
855   num_mal_peers = round (portion * num_peers);
856
857   if (rps_peer->index < num_mal_peers)
858   {
859     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860                 "%u. peer [%s] of %" PRIu32 " malicious peers turning malicious\n",
861                 rps_peer->index,
862                 GNUNET_i2s (rps_peer->peer_id),
863                 num_mal_peers);
864
865     GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
866                               rps_peer_ids, target_peer);
867   }
868   #endif /* ENABLE_MALICIOUS */
869 }
870
871 static void
872 mal_cb (struct RPSPeer *rps_peer)
873 {
874   uint32_t num_mal_peers;
875
876   #ifdef ENABLE_MALICIOUS
877   GNUNET_assert ( (1 >= portion) &&
878                   (0 <  portion) );
879   num_mal_peers = round (portion * num_peers);
880
881   if (rps_peer->index >= num_mal_peers)
882   { /* It's useless to ask a malicious peer about a random sample -
883        it's not sampling */
884     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
885                                   seed_peers, rps_peer);
886     schedule_missing_requests (rps_peer);
887   }
888   #endif /* ENABLE_MALICIOUS */
889 }
890
891 static int
892 mal_eval (void)
893 {
894   unsigned int num_mal_peers;
895
896   num_mal_peers = round (num_peers * portion);
897   return evaluate (&rps_peers[num_mal_peers],
898                    num_peers - (num_mal_peers),
899                    1);
900 }
901
902
903 /***********************************
904  * SINGLE_REQUEST
905 ***********************************/
906 static void
907 single_req_cb (struct RPSPeer *rps_peer)
908 {
909   schedule_missing_requests (rps_peer);
910 }
911
912 /***********************************
913  * DELAYED_REQUESTS
914 ***********************************/
915 static void
916 delay_req_cb (struct RPSPeer *rps_peer)
917 {
918   schedule_missing_requests (rps_peer);
919 }
920
921 /***********************************
922  * SEED
923 ***********************************/
924 static void
925 seed_cb (struct RPSPeer *rps_peer)
926 {
927   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
928                                 seed_peers, rps_peer);
929 }
930
931 /***********************************
932  * SEED_BIG
933 ***********************************/
934 static void
935 seed_big_cb (struct RPSPeer *rps_peer)
936 {
937   // TODO test seeding > GNUNET_SERVER_MAX_MESSAGE_SIZE peers
938 }
939
940 /***********************************
941  * SINGLE_PEER_SEED
942 ***********************************/
943 static void
944 single_peer_seed_cb (struct RPSPeer *rps_peer)
945 {
946   // TODO
947 }
948
949 /***********************************
950  * SEED_REQUEST
951 ***********************************/
952 static void
953 seed_req_cb (struct RPSPeer *rps_peer)
954 {
955   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
956                                 seed_peers, rps_peer);
957   schedule_missing_requests (rps_peer);
958 }
959
960 //TODO start big mal
961
962 /***********************************
963  * REQUEST_CANCEL
964 ***********************************/
965 static void
966 req_cancel_cb (struct RPSPeer *rps_peer)
967 {
968   schedule_missing_requests (rps_peer);
969   GNUNET_SCHEDULER_add_delayed (
970       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
971                                      (cur_test_run.request_interval + 1)),
972       cancel_request_cb, rps_peer);
973 }
974
975 /***********************************
976  * PROFILER
977 ***********************************/
978
979 /**
980  * Callback to be called when RPS service is started or stopped at peers
981  *
982  * @param cls NULL
983  * @param op the operation handle
984  * @param emsg NULL on success; otherwise an error description
985  */
986 static void
987 churn_cb (void *cls,
988           struct GNUNET_TESTBED_Operation *op,
989           const char *emsg)
990 {
991   // FIXME
992   struct OpListEntry *entry = cls;
993
994   GNUNET_TESTBED_operation_done (entry->op);
995   if (NULL != emsg)
996   {
997     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
998     GNUNET_SCHEDULER_shutdown ();
999     return;
1000   }
1001   GNUNET_assert (0 != entry->delta);
1002
1003   num_peers_online += entry->delta;
1004
1005   if (0 > entry->delta)
1006   { /* Peer hopefully just went offline */
1007     if (GNUNET_YES != rps_peers[entry->index].online)
1008     {
1009       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1010                   "peer %s was expected to go offline but is still marked as online\n",
1011                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1012       GNUNET_break (0);
1013     }
1014     else
1015     {
1016       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017                   "peer %s probably went offline as expected\n",
1018                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1019     }
1020     rps_peers[entry->index].online = GNUNET_NO;
1021   }
1022
1023   else if (0 < entry->delta)
1024   { /* Peer hopefully just went online */
1025     if (GNUNET_NO != rps_peers[entry->index].online)
1026     {
1027       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1028                   "peer %s was expected to go online but is still marked as offline\n",
1029                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1030       GNUNET_break (0);
1031     }
1032     else
1033     {
1034       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035                   "peer %s probably went online as expected\n",
1036                   GNUNET_i2s (rps_peers[entry->index].peer_id));
1037       if (NULL != cur_test_run.pre_test)
1038       {
1039         cur_test_run.pre_test (&rps_peers[entry->index],
1040             rps_peers[entry->index].rps_handle);
1041         schedule_missing_requests (&rps_peers[entry->index]);
1042       }
1043     }
1044     rps_peers[entry->index].online = GNUNET_YES;
1045   }
1046
1047   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
1048   GNUNET_free (entry);
1049   //if (num_peers_in_round[current_round] == peers_running)
1050   //  run_round ();
1051 }
1052
1053 static void
1054 manage_service_wrapper (unsigned int i, unsigned int j, int delta,
1055     double prob_go_on_off)
1056 {
1057   struct OpListEntry *entry;
1058   uint32_t prob;
1059
1060   prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1061                                    UINT32_MAX);
1062   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1063               "%u. selected peer (%u: %s) is %s.\n",
1064               i,
1065               j,
1066               GNUNET_i2s (rps_peers[j].peer_id),
1067               (delta < 0)? "online" : "offline");
1068   if (prob < prob_go_on_off * UINT32_MAX)
1069   {
1070     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071                 "%s goes %s\n",
1072                 GNUNET_i2s (rps_peers[j].peer_id),
1073                 (delta < 0) ? "offline" : "online");
1074
1075     entry = make_oplist_entry ();
1076     entry->delta = delta;
1077     entry->index = j;
1078     entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
1079                                                     testbed_peers[j],
1080                                                     "rps",
1081                                                     &churn_cb,
1082                                                     entry,
1083                                                     (delta < 0) ? 0 : 1);
1084   }
1085 }
1086
1087 static void
1088 churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1089 {
1090   unsigned int i;
1091   unsigned int j;
1092   double portion_online;
1093   unsigned int *permut;
1094   double prob_go_offline;
1095   double portion_go_online;
1096   double portion_go_offline;
1097
1098   /* Compute the probability for an online peer to go offline
1099    * this round */
1100   portion_online = num_peers_online * 1.0 / num_peers;
1101   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1102               "Portion online: %f\n",
1103               portion_online);
1104   portion_go_online = ((1 - portion_online) * .5 * .66);
1105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1106               "Portion that should go online: %f\n",
1107               portion_go_online);
1108   portion_go_offline = (portion_online + portion_go_online) - .75;
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Portion that probably goes offline: %f\n",
1111               portion_go_offline);
1112   prob_go_offline = portion_go_offline / (portion_online * .5);
1113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114               "Probability of a selected online peer to go offline: %f\n",
1115               prob_go_offline);
1116
1117   permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK,
1118                                          (unsigned int) num_peers);
1119
1120   /* Go over 50% randomly chosen peers */
1121   for (i = 0; i < .5 * num_peers; i++)
1122   {
1123     j = permut[i];
1124
1125     /* If online, shut down with certain probability */
1126     if (GNUNET_YES == rps_peers[j].online)
1127     {
1128       cancel_pending_req_rep (&rps_peers[j]);
1129       manage_service_wrapper (i, j, -1, prob_go_offline);
1130     }
1131
1132     /* If offline, restart with certain probability */
1133     else if (GNUNET_NO == rps_peers[j].online)
1134     {
1135       manage_service_wrapper (i, j, 1, 0.66);
1136     }
1137   }
1138
1139   GNUNET_free (permut);
1140
1141   churn_task = GNUNET_SCHEDULER_add_delayed (
1142         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
1143         churn,
1144         NULL);
1145 }
1146
1147
1148 /**
1149  * Initialise given RPSPeer
1150  */
1151 static void profiler_init_peer (struct RPSPeer *rps_peer)
1152 {
1153   if (num_peers - 1 == rps_peer->index)
1154     rps_peer->num_ids_to_request = cur_test_run.num_requests;
1155 }
1156
1157
1158 /**
1159  * Callback to call on receipt of a reply
1160  *
1161  * @param cls closure
1162  * @param n number of peers
1163  * @param recv_peers the received peers
1164  */
1165 static void
1166 profiler_reply_handle (void *cls,
1167                       uint64_t n,
1168                       const struct GNUNET_PeerIdentity *recv_peers)
1169 {
1170   struct RPSPeer *rps_peer;
1171   struct RPSPeer *rcv_rps_peer;
1172   char *file_name;
1173   char *file_name_dh;
1174   unsigned int i;
1175   struct PendingReply *pending_rep = (struct PendingReply *) cls;
1176
1177   rps_peer = pending_rep->rps_peer;
1178   file_name = "/tmp/rps/received_ids";
1179   file_name_dh = "/tmp/rps/diehard_input";
1180   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181               "[%s] got %" PRIu64 " peers:\n",
1182               GNUNET_i2s (rps_peer->peer_id),
1183               n);
1184   for (i = 0; i < n; i++)
1185   {
1186     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187                 "%u: %s\n",
1188                 i,
1189                 GNUNET_i2s (&recv_peers[i]));
1190     tofile (file_name,
1191              "%s\n",
1192              GNUNET_i2s_full (&recv_peers[i]));
1193     rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
1194     tofile (file_name_dh,
1195              "%" PRIu32 "\n",
1196              (uint32_t) rcv_rps_peer->index);
1197   }
1198   /* Find #PendingReply holding the request handle */
1199   GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1200                                rps_peer->pending_rep_tail,
1201                                pending_rep);
1202   rps_peer->num_pending_reps--;
1203 }
1204
1205
1206 static void
1207 profiler_cb (struct RPSPeer *rps_peer)
1208 {
1209   /* Start churn */
1210   if (NULL == churn_task)
1211   {
1212     churn_task = GNUNET_SCHEDULER_add_delayed (
1213           GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1214           churn,
1215           NULL);
1216   }
1217
1218   /* Only request peer ids at one peer.
1219    * (It's the before-last because last one is target of the focussed attack.)
1220    */
1221   if (eval_peer == rps_peer)
1222     schedule_missing_requests (rps_peer);
1223 }
1224
1225 /**
1226  * Function called from #profiler_eval with a filename.
1227  *
1228  * @param cls closure
1229  * @param filename complete filename (absolute path)
1230  * @return #GNUNET_OK to continue to iterate,
1231  *  #GNUNET_NO to stop iteration with no error,
1232  *  #GNUNET_SYSERR to abort iteration with error!
1233  */
1234 int
1235 file_name_cb (void *cls, const char *filename)
1236 {
1237   if (NULL != strstr (filename, "sampler_el"))
1238   {
1239     struct RPS_SamplerElement *s_elem;
1240     struct GNUNET_CRYPTO_AuthKey auth_key;
1241     const char *key_char;
1242     uint32_t i;
1243
1244     key_char = filename + 20; /* Length of "/tmp/rps/sampler_el-" */
1245     tofile (filename, "--------------------------\n");
1246
1247     auth_key = string_to_auth_key (key_char);
1248     s_elem = RPS_sampler_elem_create ();
1249     RPS_sampler_elem_set (s_elem, auth_key);
1250
1251     for (i = 0; i < num_peers; i++)
1252     {
1253       RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
1254     }
1255   }
1256   return GNUNET_OK;
1257 }
1258
1259 /**
1260  * This is run after the test finished.
1261  *
1262  * Compute all perfect samples.
1263  */
1264 int
1265 profiler_eval (void)
1266 {
1267   /* Compute perfect sample for each sampler element */
1268   if (-1 == GNUNET_DISK_directory_scan ("/tmp/rps/", file_name_cb, NULL))
1269   {
1270     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n");
1271   }
1272
1273   return 0;
1274 }
1275
1276
1277 /***********************************************************************
1278  * /Definition of tests
1279 ***********************************************************************/
1280
1281
1282 /**
1283  * Actual "main" function for the testcase.
1284  *
1285  * @param cls closure
1286  * @param h the run handle
1287  * @param n_peers number of peers in 'peers'
1288  * @param peers handle to peers run in the testbed
1289  * @param links_succeeded the number of overlay link connection attempts that
1290  *          succeeded
1291  * @param links_failed the number of overlay link connection attempts that
1292  *          failed
1293  */
1294 static void
1295 run (void *cls,
1296      struct GNUNET_TESTBED_RunHandle *h,
1297      unsigned int n_peers,
1298      struct GNUNET_TESTBED_Peer **peers,
1299      unsigned int links_succeeded,
1300      unsigned int links_failed)
1301 {
1302   unsigned int i;
1303   struct OpListEntry *entry;
1304
1305   testbed_peers = peers;
1306   num_peers_online = 0;
1307   for (i = 0 ; i < num_peers ; i++)
1308   {
1309     entry = make_oplist_entry ();
1310     entry->index = i;
1311     entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
1312                                                      GNUNET_TESTBED_PIT_IDENTITY,
1313                                                      &info_cb,
1314                                                      entry);
1315   }
1316
1317   GNUNET_assert (num_peers == n_peers);
1318   for (i = 0 ; i < n_peers ; i++)
1319   {
1320     rps_peers[i].index = i;
1321     rps_peers[i].op =
1322       GNUNET_TESTBED_service_connect (&rps_peers[i],
1323                                       peers[i],
1324                                       "rps",
1325                                       &rps_connect_complete_cb,
1326                                       &rps_peers[i],
1327                                       &rps_connect_adapter,
1328                                       &rps_disconnect_adapter,
1329                                       &rps_peers[i]);
1330   }
1331
1332   if (NULL != churn_task)
1333     GNUNET_SCHEDULER_cancel (churn_task);
1334   GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_task, NULL);
1335 }
1336
1337
1338 /**
1339  * Entry point for the testcase, sets up the testbed.
1340  *
1341  * @param argc unused
1342  * @param argv unused
1343  * @return 0 on success
1344  */
1345 int
1346 main (int argc, char *argv[])
1347 {
1348   int ret_value;
1349
1350   num_peers = 5;
1351   cur_test_run.name = "test-rps-default";
1352   cur_test_run.init_peer = default_init_peer;
1353   cur_test_run.pre_test = NULL;
1354   cur_test_run.reply_handle = default_reply_handle;
1355   cur_test_run.eval_cb = default_eval_cb;
1356   churn_task = NULL;
1357   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
1358
1359   if (strstr (argv[0], "malicious") != NULL)
1360   {
1361     cur_test_run.pre_test = mal_pre;
1362     cur_test_run.main_test = mal_cb;
1363     cur_test_run.eval_cb = mal_eval;
1364
1365     if (strstr (argv[0], "_1") != NULL)
1366     {
1367       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 1\n");
1368       cur_test_run.name = "test-rps-malicious_1";
1369       mal_type = 1;
1370     }
1371     else if (strstr (argv[0], "_2") != NULL)
1372     {
1373       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 2\n");
1374       cur_test_run.name = "test-rps-malicious_2";
1375       mal_type = 2;
1376     }
1377     else if (strstr (argv[0], "_3") != NULL)
1378     {
1379       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 3\n");
1380       cur_test_run.name = "test-rps-malicious_3";
1381       mal_type = 3;
1382     }
1383   }
1384
1385   else if (strstr (argv[0], "_single_req") != NULL)
1386   {
1387     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test single request\n");
1388     cur_test_run.name = "test-rps-single-req";
1389     cur_test_run.main_test = single_req_cb;
1390   }
1391
1392   else if (strstr (argv[0], "_delayed_reqs") != NULL)
1393   {
1394     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test delayed requests\n");
1395     cur_test_run.name = "test-rps-delayed-reqs";
1396     cur_test_run.main_test = delay_req_cb;
1397   }
1398
1399   else if (strstr (argv[0], "_seed_big") != NULL)
1400   {
1401     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding (num_peers > GNUNET_SERVER_MAX_MESSAGE_SIZE)\n");
1402     cur_test_run.name = "test-rps-seed-big";
1403     cur_test_run.main_test = seed_big_cb;
1404   }
1405
1406   else if (strstr (argv[0], "_single_peer_seed") != NULL)
1407   {
1408     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on a single peer\n");
1409     cur_test_run.name = "test-rps-single-peer-seed";
1410     cur_test_run.main_test = single_peer_seed_cb;
1411   }
1412
1413   else if (strstr (argv[0], "_seed_request") != NULL)
1414   {
1415     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding and requesting on multiple peers\n");
1416     cur_test_run.name = "test-rps-seed-request";
1417     cur_test_run.main_test = seed_req_cb;
1418   }
1419
1420   else if (strstr (argv[0], "_seed") != NULL)
1421   {
1422     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding\n");
1423     cur_test_run.name = "test-rps-seed";
1424     cur_test_run.main_test = seed_cb;
1425     cur_test_run.eval_cb = no_eval;
1426   }
1427
1428   else if (strstr (argv[0], "_req_cancel") != NULL)
1429   {
1430     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
1431     cur_test_run.name = "test-rps-req-cancel";
1432     num_peers = 1;
1433     cur_test_run.main_test = req_cancel_cb;
1434     cur_test_run.eval_cb = no_eval;
1435   }
1436
1437   else if (strstr (argv[0], "profiler") != NULL)
1438   {
1439     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
1440     cur_test_run.name = "test-rps-profiler";
1441     num_peers = 10;
1442     mal_type = 3;
1443     cur_test_run.init_peer = profiler_init_peer;
1444     cur_test_run.pre_test = mal_pre;
1445     cur_test_run.main_test = profiler_cb;
1446     cur_test_run.reply_handle = profiler_reply_handle;
1447     cur_test_run.eval_cb = profiler_eval;
1448     cur_test_run.request_interval = 2;
1449     cur_test_run.num_requests = 5;
1450     timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90);
1451
1452     /* 'Clean' directory */
1453     (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
1454     GNUNET_DISK_directory_create ("/tmp/rps/");
1455   }
1456
1457   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
1458   peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
1459   rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1460   if ( (2 == mal_type) ||
1461        (3 == mal_type))
1462     target_peer = &rps_peer_ids[num_peers - 2];
1463   if (profiler_eval == cur_test_run.eval_cb)
1464     eval_peer = &rps_peers[num_peers - 1];
1465
1466   ok = 1;
1467   (void) GNUNET_TESTBED_test_run (cur_test_run.name,
1468                                   "test_rps.conf",
1469                                   num_peers,
1470                                   0, NULL, NULL,
1471                                   &run, NULL);
1472
1473   ret_value = cur_test_run.eval_cb();
1474   GNUNET_free (rps_peers );
1475   GNUNET_free (rps_peer_ids);
1476   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1477   return ret_value;
1478 }
1479
1480 /* end of test_rps_multipeer.c */