implemented seeding
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_nse_service.h"
30 #include "rps.h"
31
32 #include "gnunet-service-rps_sampler.h"
33
34 #include <math.h>
35 #include <inttypes.h>
36
37 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
38
39 // TODO modify @brief in every file
40
41 // TODO take care that messages are not longer than 64k
42
43 // TODO check for overflows
44
45 // TODO align message structs
46
47 // (TODO api -- possibility of getting weak random peer immideately)
48
49 // TODO malicious peer
50
51 // TODO Change API to accept initialisation peers
52
53 // TODO Change API to accept good peers 'friends'
54
55 // TODO store peers somewhere
56
57 // TODO check that every id we get is valid - is it reachable?
58
59 // TODO ignore list
60
61 // hist_size_init, hist_size_max
62
63 /**
64  * Our configuration.
65  */
66 static const struct GNUNET_CONFIGURATION_Handle *cfg;
67
68 /**
69  * Our own identity.
70  */
71 static struct GNUNET_PeerIdentity *own_identity;
72
73 /**
74  * Closure to the callback cadet calls on each peer it passes to us
75  */
76 struct init_peer_cls
77 {
78   /**
79    * The server handle to later listen to client requests
80    */
81   struct GNUNET_SERVER_Handle *server;
82
83   /**
84    * Counts how many peers cadet already passed to us
85    */
86   uint32_t i;
87 };
88
89
90   struct GNUNET_PeerIdentity *
91 get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
92
93
94 /***********************************************************************
95  * Housekeeping with peers
96 ***********************************************************************/
97
98 /**
99  * Struct used to store the context of a connected client.
100  */
101 struct client_ctx
102 {
103   /**
104    * The message queue to communicate with the client.
105    */
106   struct GNUNET_MQ_Handle *mq;
107 };
108
109 /**
110  * Used to keep track in what lists single peerIDs are.
111  */
112 enum in_list_flag // probably unneeded
113 {
114   in_other_sampler_list = 0x1,
115   in_other_gossip_list  = 0x2, // unneeded?
116   in_own_sampler_list   = 0x4,
117   in_own_gossip_list    = 0x8 // unneeded?
118 };
119
120 /**
121  * Struct used to keep track of other peer's status
122  *
123  * This is stored in a multipeermap.
124  */
125 struct peer_context
126 {
127   /**
128    * In own gossip/sampler list, in other's gossip/sampler list
129    */
130   uint32_t in_flags; // unneeded?
131
132   /**
133    * Message queue open to client
134    */
135   struct GNUNET_MQ_Handle *mq;
136
137   /**
138    * Channel open to client.
139    */
140   struct GNUNET_CADET_Channel *to_channel;
141
142   /**
143    * Channel open from client.
144    */
145   struct GNUNET_CADET_Channel *from_channel; // unneeded
146
147   /**
148    * This is pobably followed by 'statistical' data (when we first saw
149    * him, how did we get his ID, how many pushes (in a timeinterval),
150    * ...)
151    */
152 };
153
154 /***********************************************************************
155  * /Housekeeping with peers
156 ***********************************************************************/
157
158 /***********************************************************************
159  * Globals
160 ***********************************************************************/
161
162 /**
163  * Set of all peers to keep track of them.
164  */
165 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
166
167
168 /**
169  * The gossiped list of peers.
170  */
171 static struct GNUNET_PeerIdentity *gossip_list;
172
173 /**
174  * Size of the gossiped list
175  */
176 static unsigned int gossip_list_size;
177
178
179 /**
180  * The size Brahms needs according to the network size.
181  *
182  * This is directly taken as the #gossip_list_size on update of the
183  * #gossip_list
184  * This is the minimum size the sampler grows to.
185  */
186 static unsigned int sampler_size;
187 //size_t sampler_size;
188
189 /**
190  * The size of sampler we need to be able to satisfy the client's need of
191  * random peers.
192  */
193 //static unsigned int sampler_size_client_need;
194
195
196 /**
197  * Percentage of total peer number in the gossip list
198  * to send random PUSHes to
199  *
200  * TODO do not read from configuration
201  */
202 static float alpha;
203
204 /**
205  * Percentage of total peer number in the gossip list
206  * to send random PULLs to
207  *
208  * TODO do not read from configuration
209  */
210 static float beta;
211
212 /**
213  * The percentage gamma of history updates.
214  * Simply 1 - alpha - beta
215  */
216
217
218 /**
219  * Identifier for the main task that runs periodically.
220  */
221 static struct GNUNET_SCHEDULER_Task * do_round_task;
222
223 /**
224  * Time inverval the do_round task runs in.
225  */
226 static struct GNUNET_TIME_Relative round_interval;
227
228
229
230 /**
231  * List to store peers received through pushes temporary.
232  *
233  * TODO -> multipeermap
234  */
235 static struct GNUNET_PeerIdentity *push_list;
236
237 /**
238  * Size of the push_list;
239  */
240 static unsigned int push_list_size;
241 //size_t push_list_size;
242
243 /**
244  * List to store peers received through pulls temporary.
245  *
246  * TODO -> multipeermap
247  */
248 static struct GNUNET_PeerIdentity *pull_list;
249
250 /**
251  * Size of the pull_list;
252  */
253 static unsigned int pull_list_size;
254 //size_t pull_list_size;
255
256
257 /**
258  * Handler to NSE.
259  */
260 static struct GNUNET_NSE_Handle *nse;
261
262 /**
263  * Handler to CADET.
264  */
265 static struct GNUNET_CADET_Handle *cadet_handle;
266
267 /**
268  * Global counter
269  */
270 uint64_t g_i = 0;
271
272
273 /**
274  * Request counter.
275  *
276  * Only needed in the beginning to check how many of the 64 deltas
277  * we already have
278  */
279 static unsigned int req_counter;
280
281 /**
282  * Time of the last request we received.
283  *
284  * Used to compute the expected request rate.
285  */
286 static struct GNUNET_TIME_Absolute last_request;
287
288 /**
289  * Size of #request_deltas.
290  */
291 #define REQUEST_DELTAS_SIZE 64
292 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
293
294 /**
295  * Last 64 deltas between requests
296  */
297 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
298
299 /**
300  * The prediction of the rate of requests
301  */
302 static struct GNUNET_TIME_Relative  request_rate;
303
304
305 /***********************************************************************
306  * /Globals
307 ***********************************************************************/
308
309
310 /***********************************************************************
311  * Util functions
312 ***********************************************************************/
313
314 /**
315  * Check if peer is already in peer array.
316  */
317   int
318 in_arr (const struct GNUNET_PeerIdentity *array,
319         unsigned int arr_size,
320         const struct GNUNET_PeerIdentity *peer)
321 {
322   GNUNET_assert (NULL != peer);
323
324   if (0 == arr_size)
325     return GNUNET_NO;
326
327   GNUNET_assert (NULL != array);
328
329   unsigned int i;
330
331   i = 0;
332   while (0 != GNUNET_CRYPTO_cmp_peer_identity (&array[i], peer) &&
333          i < arr_size)
334     i++;
335
336   if (i == arr_size)
337     return GNUNET_NO;
338   else
339     return GNUNET_YES;
340 }
341
342
343 /**
344  * Get random peer from the gossip list.
345  */
346   struct GNUNET_PeerIdentity *
347 get_rand_peer(const struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
348 {
349   uint64_t r_index;
350   struct GNUNET_PeerIdentity *peer;
351
352   peer = GNUNET_new(struct GNUNET_PeerIdentity);
353   // FIXME if we have only NULL in gossip list this will block
354   // but then we might have a problem nevertheless
355
356   do
357   {
358
359     /**;
360      * Choose the r_index of the peer we want to return
361      * at random from the interval of the gossip list
362      */
363     r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
364                                      list_size);
365
366     *peer = peer_list[r_index];
367   } while (NULL == peer);
368
369   return peer;
370 }
371
372
373 /**
374  * Get the context of a peer. If not existing, create.
375  */
376   struct peer_context *
377 get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
378 {
379   struct peer_context *ctx;
380
381   if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
382   {
383     ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
384   }
385   else
386   {
387     ctx = GNUNET_new (struct peer_context);
388     ctx->in_flags = 0;
389     ctx->mq = NULL;
390     ctx->to_channel = NULL;
391     ctx->from_channel = NULL;
392     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
393   }
394   return ctx;
395 }
396
397
398 /**
399  * Get the channel of a peer. If not existing, create.
400  */
401   struct GNUNET_CADET_Channel *
402 get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
403 {
404   struct peer_context *ctx;
405
406   ctx = get_peer_ctx (peer_map, peer);
407   if (NULL == ctx->to_channel)
408   {
409     ctx->to_channel = GNUNET_CADET_channel_create (cadet_handle, NULL, peer,
410                                                    GNUNET_RPS_CADET_PORT,
411                                                    GNUNET_CADET_OPTION_RELIABLE);
412     // do I have to explicitly put it in the peer_map?
413     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
414                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
415   }
416   return ctx->to_channel;
417 }
418
419
420 /**
421  * Get the message queue of a specific peer.
422  *
423  * If we already have a message queue open to this client,
424  * simply return it, otherways create one.
425  */
426   struct GNUNET_MQ_Handle *
427 get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
428 {
429   struct peer_context *ctx;
430
431   ctx = get_peer_ctx (peer_map, peer_id);
432   if (NULL == ctx->mq)
433   {
434     (void) get_channel (peer_map, peer_id);
435     ctx->mq = GNUNET_CADET_mq_create (ctx->to_channel);
436     //do I have to explicitly put it in the peer_map?
437     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer_id, ctx,
438                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
439   }
440   return ctx->mq;
441 }
442
443
444 /**
445  * Sum all time relatives of an array.
446   */
447   struct GNUNET_TIME_Relative
448 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint64_t arr_size)
449 {
450   struct GNUNET_TIME_Relative sum;
451   uint64_t i;
452
453   sum = GNUNET_TIME_UNIT_ZERO;
454   for ( i = 0 ; i < arr_size ; i++ )
455   {
456     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
457   }
458   return sum;
459 }
460
461
462 /**
463  * Compute the average of given time relatives.
464  */
465   struct GNUNET_TIME_Relative
466 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint64_t arr_size)
467 {
468   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size); // FIXME find a way to devide that by arr_size
469 }
470
471
472 /***********************************************************************
473  * /Util functions
474 ***********************************************************************/
475
476 /**
477  * Function called by NSE.
478  *
479  * Updates sizes of sampler list and gossip list and adapt those lists
480  * accordingly.
481  */
482   void
483 nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
484 {
485   double estimate;
486   unsigned int old_est;
487   //double scale; // TODO this might go gloabal/config
488
489   old_est = sampler_size;
490
491   LOG (GNUNET_ERROR_TYPE_DEBUG,
492       "Received a ns estimate - logest: %f, std_dev: %f (old_est: %f)\n",
493       logestimate, std_dev, old_est);
494   //scale = .01;
495   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
496   // GNUNET_NSE_log_estimate_to_n (logestimate);
497   estimate = pow (estimate, 1./3);
498   // TODO add if std_dev is a number
499   // estimate += (std_dev * scale);
500   if ( 0 < estimate ) {
501     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
502     sampler_size = estimate;
503   } else
504     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
505
506   /* If the NSE has changed adapt the lists accordingly */
507   // TODO respect the request rate, min, max
508   if (old_est > sampler_size*4)
509   { /* Shrinking */
510     RPS_sampler_resize (old_est/2);
511   }
512   else if (old_est < sampler_size)
513   { /* Growing */
514     if (sampler_size < old_est*2)
515       RPS_sampler_resize (old_est*2);
516     else
517       RPS_sampler_resize (sampler_size);
518   }
519 }
520
521 /**
522  * Handle RPS request from the client.
523  *
524  * @param cls closure
525  * @param client identification of the client
526  * @param message the actual message
527  */
528 static void
529 handle_client_request (void *cls,
530             struct GNUNET_SERVER_Client *client,
531             const struct GNUNET_MessageHeader *message)
532 {
533   LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n");
534
535   struct GNUNET_RPS_CS_RequestMessage *msg;
536   //unsigned int n_arr[sampler_list->size];// =
537     //GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
538   //struct GNUNET_MQ_Handle *mq;
539   struct client_ctx *cli_ctx;
540   struct GNUNET_MQ_Envelope *ev;
541   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
542   uint64_t num_peers;
543   const struct GNUNET_PeerIdentity *peers;
544   //uint64_t i;
545
546
547   /* Estimate request rate */
548   if (request_deltas_size > req_counter)
549     req_counter++;
550   if ( 1 < req_counter)
551   {
552     /* Shift last request deltas to the right */
553     memcpy (&request_deltas[1],
554         request_deltas,
555         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
556     /* Add current delta to beginning */
557     request_deltas[0] = GNUNET_TIME_absolute_get_difference (last_request,
558         GNUNET_TIME_absolute_get ());
559     request_rate = T_relative_avg (request_deltas, req_counter);
560   }
561   last_request = GNUNET_TIME_absolute_get();
562   // TODO resize the size of the extended_samplers
563
564
565   // TODO check message size
566   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
567   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
568   if ( NULL == cli_ctx ) {
569     cli_ctx = GNUNET_new(struct client_ctx);
570     cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
571     GNUNET_SERVER_client_set_user_context (client, cli_ctx);
572   }
573   
574   // How many peers do we give back?
575   // Wait until we have enough random peers?
576
577   num_peers = GNUNET_ntohll (msg->num_peers);
578
579   ev = GNUNET_MQ_msg_extra (out_msg,
580                             num_peers * sizeof (struct GNUNET_PeerIdentity),
581                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
582   out_msg->num_peers = msg->num_peers; // No conversion between network and network order
583
584   //&out_msg[1] = RPS_sampler_get_n_rand_peers (num_peers);
585   peers = RPS_sampler_get_n_rand_peers (num_peers);
586   memcpy(&out_msg[1],
587       peers,
588       num_peers * sizeof (struct GNUNET_PeerIdentity));
589   
590   GNUNET_MQ_send (cli_ctx->mq, ev);
591   //GNUNET_MQ_destroy(mq);
592
593   GNUNET_SERVER_receive_done (client,
594                               GNUNET_OK);
595 }
596
597
598 /**
599  * Handle seed from the client.
600  *
601  * @param cls closure
602  * @param client identification of the client
603  * @param message the actual message
604  */
605   static void
606 handle_client_seed (void *cls,
607             struct GNUNET_SERVER_Client *client,
608             const struct GNUNET_MessageHeader *message)
609 {
610   struct GNUNET_RPS_CS_SeedMessage *in_msg;
611   struct GNUNET_PeerIdentity *peers;
612   uint64_t i;
613
614   if (sizeof (struct GNUNET_RPS_CS_SeedMessage) < ntohs (message->size))
615   {
616     GNUNET_break_op (0);
617     GNUNET_SERVER_receive_done (client,
618               GNUNET_SYSERR);
619   }
620   in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
621   if (ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage) /
622       sizeof (struct GNUNET_PeerIdentity) != GNUNET_ntohll (in_msg->num_peers))
623   {
624     GNUNET_break_op (0);
625     GNUNET_SERVER_receive_done (client,
626               GNUNET_SYSERR);
627   }
628
629   in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
630   peers = (struct GNUNET_PeerIdentity *) &message[1];
631
632   for ( i = 0 ; i < GNUNET_ntohll (in_msg->num_peers) ; i++ )
633     RPS_sampler_update_list (&peers[i]);
634
635   GNUNET_SERVER_receive_done (client,
636                               GNUNET_OK);
637 }
638
639
640 /**
641  * Handle a PUSH message from another peer.
642  *
643  * Check the proof of work and store the PeerID
644  * in the temporary list for pushed PeerIDs.
645  *
646  * @param cls Closure
647  * @param channel The channel the PUSH was received over
648  * @param channel_ctx The context associated with this channel
649  * @param msg The message header
650  */
651 static int
652 handle_peer_push (void *cls,
653     struct GNUNET_CADET_Channel *channel,
654     void **channel_ctx,
655     const struct GNUNET_MessageHeader *msg)
656 {
657   const struct GNUNET_PeerIdentity *peer;
658
659   // (check the proof of work) 
660   
661   // TODO accept empty message
662   if (ntohs(msg->size) != sizeof (struct GNUNET_RPS_P2P_PushMessage))
663   {
664     GNUNET_break_op (0); // At the moment our own implementation seems to break that.
665     return GNUNET_SYSERR;
666   }
667
668   peer = (const struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
669   // FIXME wait for cadet to change this function
670   LOG (GNUNET_ERROR_TYPE_DEBUG, "PUSH received (%s)\n", GNUNET_i2s (peer));
671   
672   /* Add the sending peer to the push_list */
673   if (GNUNET_NO == in_arr (push_list, pull_list_size, peer))
674     GNUNET_array_append (push_list, push_list_size, *peer);
675
676   return GNUNET_OK;
677 }
678
679 /**
680  * Handle PULL REQUEST request message from another peer.
681  *
682  * Reply with the gossip list of PeerIDs.
683  *
684  * @param cls Closure
685  * @param channel The channel the PUSH was received over
686  * @param channel_ctx The context associated with this channel
687  * @param msg The message header
688  */
689 static int
690 handle_peer_pull_request (void *cls,
691     struct GNUNET_CADET_Channel *channel,
692     void **channel_ctx,
693     const struct GNUNET_MessageHeader *msg)
694 {
695   struct GNUNET_PeerIdentity *peer;
696   struct GNUNET_MQ_Handle *mq;
697   struct GNUNET_MQ_Envelope *ev;
698   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
699
700   // assert that msg->size is 0
701
702   // TODO accept empty message
703   if (ntohs(msg->size) != sizeof (struct GNUNET_RPS_P2P_PullRequestMessage))
704   {
705     GNUNET_break_op (0); // At the moment our own implementation seems to break that.
706     return GNUNET_SYSERR;
707   }
708
709   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
710   // FIXME wait for cadet to change this function
711   LOG (GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s (peer));
712
713   mq = get_mq (peer_map, peer);
714
715   ev = GNUNET_MQ_msg_extra (out_msg,
716                            gossip_list_size * sizeof (struct GNUNET_PeerIdentity),
717                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
718   out_msg->num_peers = GNUNET_htonll (gossip_list_size);
719   memcpy (&out_msg[1], gossip_list,
720          gossip_list_size * sizeof (struct GNUNET_PeerIdentity));
721
722   GNUNET_MQ_send (mq, ev);
723
724   return GNUNET_OK;
725 }
726
727 /**
728  * Handle PULL REPLY message from another peer.
729  *
730  * Check whether we sent a corresponding request and
731  * whether this reply is the first one.
732  *
733  * @param cls Closure
734  * @param channel The channel the PUSH was received over
735  * @param channel_ctx The context associated with this channel
736  * @param msg The message header
737  */
738 static int
739 handle_peer_pull_reply (void *cls,
740     struct GNUNET_CADET_Channel *channel,
741     void **channel_ctx,
742     const struct GNUNET_MessageHeader *msg)
743 {
744   LOG (GNUNET_ERROR_TYPE_DEBUG, "PULL REPLY received\n");
745
746   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
747   struct GNUNET_PeerIdentity *peers;
748   uint64_t i;
749
750   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) < ntohs (msg->size))
751   {
752     GNUNET_break_op (0); // At the moment our own implementation seems to break that.
753     return GNUNET_SYSERR;
754   }
755   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
756   if (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) / sizeof (struct GNUNET_PeerIdentity) != GNUNET_ntohll (in_msg->num_peers))
757   {
758     GNUNET_break_op (0);
759     return GNUNET_SYSERR;
760   }
761
762   // TODO check that we sent a request and that it is the first reply
763
764   peers = (struct GNUNET_PeerIdentity *) &msg[1];
765   for ( i = 0 ; i < GNUNET_ntohll (in_msg->num_peers) ; i++ )
766   {
767     if (GNUNET_NO == in_arr(pull_list, pull_list_size, &peers[i]))
768       GNUNET_array_append (pull_list, pull_list_size, peers[i]);
769   }
770
771   // TODO check that id is valid - whether it is reachable
772
773   return GNUNET_OK;
774 }
775
776
777 /**
778  * Send out PUSHes and PULLs.
779  *
780  * This is executed regylary.
781  */
782 static void
783 do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
784 {
785   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round\n");
786
787   uint64_t i;
788   //unsigned int *n_arr;
789   unsigned int n_peers; /* Number of peers we send pushes/pulls to */
790   struct GNUNET_RPS_P2P_PushMessage        *push_msg;
791   struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty message
792   struct GNUNET_MQ_Envelope *ev;
793   const struct GNUNET_PeerIdentity *peer;
794   struct GNUNET_MQ_Handle *mq;
795
796   // TODO print lists, ...
797   // TODO randomise and spread calls herein over time
798
799
800   /* Would it make sense to have one shuffeled gossip list and then
801    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
802    * use the rest to update sampler?
803    * in essence get random peers with consumption */
804
805   /* Send PUSHes */
806   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
807   n_peers = round (alpha * gossip_list_size);
808   if (0 == n_peers)
809     n_peers = 1;
810   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n",
811       n_peers, alpha, gossip_list_size);
812   for ( i = 0 ; i < n_peers ; i++ )
813   {
814     peer = get_rand_peer (gossip_list, gossip_list_size);
815     if (own_identity != peer)
816     { // FIXME if this fails schedule/loop this for later
817       LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer));
818
819       ev = GNUNET_MQ_msg (push_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
820       push_msg = NULL;
821       // FIXME sometimes it returns a pointer to a freed mq
822       mq = get_mq (peer_map, peer);
823       GNUNET_MQ_send (mq, ev);
824     }
825   }
826
827
828   /* Send PULL requests */
829   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
830   n_peers = round (beta * gossip_list_size);
831   if (0 == n_peers)
832     n_peers = 1;
833   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n",
834       n_peers, beta, gossip_list_size);
835   for ( i = 0 ; i < n_peers ; i++ )
836   {
837     peer = get_rand_peer (gossip_list, gossip_list_size);
838     if (own_identity != peer)
839     { // FIXME if this fails schedule/loop this for later
840       LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer));
841
842       ev = GNUNET_MQ_msg (pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
843       pull_msg = NULL;
844       mq = get_mq (peer_map, peer);
845       GNUNET_MQ_send (mq, ev);
846     }
847   }
848
849
850   /* Update gossip list */
851   uint64_t r_index;
852
853   if ( push_list_size <= alpha * gossip_list_size &&
854        push_list_size != 0 &&
855        pull_list_size != 0 )
856   {
857     LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
858
859     uint64_t first_border;
860     uint64_t second_border;
861     
862     GNUNET_array_grow(gossip_list, gossip_list_size, sampler_size);
863
864     first_border = round(alpha * gossip_list_size);
865     for ( i = 0 ; i < first_border ; i++ )
866     { // TODO use RPS_sampler_get_n_rand_peers
867       /* Update gossip list with peers received through PUSHes */
868       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
869                                        push_list_size);
870       gossip_list[i] = push_list[r_index];
871       // TODO change the in_flags accordingly
872     }
873
874     second_border = first_border + round(beta * gossip_list_size);
875     for ( i = first_border ; i < second_border ; i++ )
876     {
877       /* Update gossip list with peers received through PULLs */
878       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
879                                        pull_list_size);
880       gossip_list[i] = pull_list[r_index];
881       // TODO change the in_flags accordingly
882     }
883
884     for ( i = second_border ; i < gossip_list_size ; i++ )
885     {
886       /* Update gossip list with peers from history */
887       peer = RPS_sampler_get_n_rand_peers (1),
888       gossip_list[i] = *peer;
889       // TODO change the in_flags accordingly
890     }
891
892   }
893   else
894   {
895     LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
896   }
897   // TODO independent of that also get some peers from CADET_get_peers()?
898
899
900   /* Update samplers */
901
902   for ( i = 0 ; i < push_list_size ; i++ )
903   {
904     RPS_sampler_update_list (&push_list[i]);
905     // TODO set in_flag?
906   }
907
908   for ( i = 0 ; i < pull_list_size ; i++ )
909   {
910     RPS_sampler_update_list (&pull_list[i]);
911     // TODO set in_flag?
912   }
913
914
915   /* Empty push/pull lists */
916   GNUNET_array_grow (push_list, push_list_size, 0);
917   GNUNET_array_grow (pull_list, pull_list_size, 0);
918
919   struct GNUNET_TIME_Relative time_next_round;
920   struct GNUNET_TIME_Relative half_round_interval;
921   unsigned int rand_delay;
922
923   /* Compute random time value between .5 * round_interval and 1.5 *round_interval */
924   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
925   do
926   {
927   /*
928    * Compute random value between (0 and 1) * round_interval
929    * via multiplying round_interval with a 'fraction' (0 to value)/value
930    */
931   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT_MAX/10);
932   time_next_round = GNUNET_TIME_relative_multiply (round_interval,  rand_delay);
933   time_next_round = GNUNET_TIME_relative_divide   (time_next_round, UINT_MAX/10);
934   time_next_round = GNUNET_TIME_relative_add      (time_next_round, half_round_interval);
935   } while (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == time_next_round.rel_value_us);
936
937   /* Schedule next round */
938   do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_round, NULL);
939   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
940 }
941
942
943 /**
944  * Open a connection to given peer and store channel and mq.
945  */
946   void
947 insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
948 {
949   // We open a channel to be notified when this peer goes down.
950   (void) get_channel (peer_map, id);
951 }
952
953
954 /**
955  * Close the connection to given peer and delete channel and mq.
956  */
957   void
958 removeCB (void *cls, const struct GNUNET_PeerIdentity *id)
959 {
960   size_t s;
961   struct peer_context *ctx;
962
963   s = RPS_sampler_count_id (id);
964   if ( 1 >= s )
965   {
966     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
967     {
968       ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, id);
969       if (NULL != ctx->to_channel)
970       {
971         if (NULL != ctx->mq)
972         {
973           GNUNET_MQ_destroy (ctx->mq);
974         }
975         // may already be freed at shutdown of cadet
976         //GNUNET_CADET_channel_destroy (ctx->to_channel);
977       }
978       // TODO cleanup peer
979       (void) GNUNET_CONTAINER_multipeermap_remove_all (peer_map, id);
980     }
981   }
982 }
983
984 static void
985 rps_start (struct GNUNET_SERVER_Handle *server);
986
987 /**
988  * This is called from GNUNET_CADET_get_peers().
989  *
990  * It is called on every peer(ID) that cadet somehow has contact with.
991  * We use those to initialise the sampler.
992  */
993 void
994 init_peer_cb (void *cls,
995               const struct GNUNET_PeerIdentity *peer,
996               int tunnel, // "Do we have a tunnel towards this peer?"
997               unsigned int n_paths, // "Number of known paths towards this peer"
998               unsigned int best_path) // "How long is the best path?
999                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1000 {
1001   struct init_peer_cls *ipc;
1002
1003   ipc = (struct init_peer_cls *) cls;
1004   if ( NULL != peer )
1005   {
1006     LOG (GNUNET_ERROR_TYPE_DEBUG,
1007         "Got %" PRIX32 ". peer %s (at %p) from CADET (gossip_list_size: %u)\n",
1008         ipc->i, GNUNET_i2s (peer), peer, gossip_list_size);
1009     RPS_sampler_update_list (peer);
1010     (void) get_peer_ctx (peer_map, peer); // unneeded? -> insertCB
1011
1012     if (ipc->i < gossip_list_size)
1013     {
1014       gossip_list[ipc->i] = *peer; // FIXME sometimes we're writing to invalid space here
1015                                    // not sure whether fixed
1016       ipc->i++;
1017     }
1018
1019     // send push/pull to each of those peers?
1020   }
1021   else
1022   {
1023     if (ipc->i < gossip_list_size)
1024     {
1025       memcpy(&gossip_list[ipc->i],
1026           RPS_sampler_get_n_rand_peers (1),
1027           (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity));
1028     }
1029     rps_start (ipc->server);
1030     GNUNET_free (ipc);
1031   }
1032 }
1033
1034
1035 /**
1036  * Task run during shutdown.
1037  *
1038  * @param cls unused
1039  * @param tc unused
1040  */
1041 static void
1042 shutdown_task (void *cls,
1043                const struct GNUNET_SCHEDULER_TaskContext *tc)
1044 {
1045   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
1046
1047   if ( NULL != do_round_task )
1048   {
1049     GNUNET_SCHEDULER_cancel (do_round_task);
1050     do_round_task = NULL;
1051   }
1052
1053   GNUNET_NSE_disconnect (nse);
1054   GNUNET_CADET_disconnect (cadet_handle);
1055   GNUNET_free (own_identity);
1056   RPS_sampler_destroy ();
1057   GNUNET_array_grow (request_deltas, request_deltas_size, 0);
1058   GNUNET_array_grow (gossip_list, gossip_list_size, 0);
1059   GNUNET_array_grow (push_list, push_list_size, 0);
1060   GNUNET_array_grow (pull_list, pull_list_size, 0);
1061 }
1062
1063
1064 /**
1065  * A client disconnected.  Remove all of its data structure entries.
1066  *
1067  * @param cls closure, NULL
1068  * @param client identification of the client
1069  */
1070 static void
1071 handle_client_disconnect (void *cls,
1072                           struct GNUNET_SERVER_Client * client)
1073 {
1074 }
1075
1076 /**
1077  * Handle the channel a peer opens to us.
1078  *
1079  * @param cls The closure
1080  * @param channel The channel the peer wants to establish
1081  * @param initiator The peer's peer ID
1082  * @param port The port the channel is being established over
1083  * @param options Further options
1084  */
1085   static void *
1086 handle_inbound_channel (void *cls,
1087                         struct GNUNET_CADET_Channel *channel,
1088                         const struct GNUNET_PeerIdentity *initiator,
1089                         uint32_t port,
1090                         enum GNUNET_CADET_ChannelOption options)
1091 {
1092   struct peer_context *ctx;
1093
1094   LOG(GNUNET_ERROR_TYPE_DEBUG, "New channel was established to us (Peer %s).\n", GNUNET_i2s(initiator));
1095
1096   GNUNET_assert( NULL != channel );
1097
1098   // we might not even store the from_channel
1099
1100   ctx = get_peer_ctx(peer_map, initiator);
1101   if (NULL != ctx->from_channel)
1102   {
1103     ctx->from_channel = channel;
1104   }
1105
1106   // FIXME there might already be an established channel
1107
1108   //ctx->in_flags = in_other_gossip_list;
1109   ctx->mq = NULL; // TODO create mq?
1110
1111   (void) GNUNET_CONTAINER_multipeermap_put (peer_map, initiator, ctx,
1112       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1113   return NULL; // TODO
1114 }
1115
1116 /**
1117  * This is called when a remote peer destroys a channel.
1118  *
1119  * @param cls The closure
1120  * @param channel The channel being closed
1121  * @param channel_ctx The context associated with this channel
1122  */
1123 static void
1124 cleanup_channel(void *cls,
1125                 const struct GNUNET_CADET_Channel *channel,
1126                 void *channel_ctx)
1127 {
1128   struct GNUNET_PeerIdentity *peer;
1129   LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel to remote peer was destroyed.\n");
1130
1131   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
1132       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
1133        // Guess simply casting isn't the nicest way...
1134        // FIXME wait for cadet to change this function
1135   RPS_sampler_reinitialise_by_value (peer);
1136 }
1137
1138 /**
1139  * Actually start the service.
1140  */
1141 static void
1142 rps_start (struct GNUNET_SERVER_Handle *server)
1143 {
1144   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1145     {&handle_client_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
1146       sizeof (struct GNUNET_RPS_CS_RequestMessage)},
1147     {&handle_client_seed,    NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
1148     {NULL, NULL, 0, 0}
1149   };
1150
1151   GNUNET_SERVER_add_handlers (server, handlers);
1152   GNUNET_SERVER_disconnect_notify (server,
1153                                    &handle_client_disconnect,
1154                                    NULL);
1155   LOG(GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
1156
1157
1158   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1159   LOG(GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
1160
1161   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1162                                 &shutdown_task,
1163                                 NULL);
1164 }
1165
1166
1167 /**
1168  * Process statistics requests.
1169  *
1170  * @param cls closure
1171  * @param server the initialized server
1172  * @param c configuration to use
1173  */
1174 static void
1175 run (void *cls,
1176      struct GNUNET_SERVER_Handle *server,
1177      const struct GNUNET_CONFIGURATION_Handle *c)
1178 {
1179   // TODO check what this does -- copied from gnunet-boss
1180   // - seems to work as expected
1181   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
1182
1183   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
1184
1185   struct init_peer_cls *ipc;
1186
1187   cfg = c;
1188
1189
1190   /* Get own ID */
1191   own_identity = GNUNET_new (struct GNUNET_PeerIdentity);
1192   GNUNET_CRYPTO_get_peer_identity (cfg, own_identity); // TODO check return value
1193   GNUNET_assert (NULL != own_identity);
1194   LOG (GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s(own_identity), own_identity);
1195
1196
1197   /* Get time interval from the configuration */
1198   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
1199                                                         "ROUNDINTERVAL",
1200                                                         &round_interval))
1201   {
1202     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read ROUNDINTERVAL from config\n");
1203     GNUNET_SCHEDULER_shutdown();
1204     return;
1205   }
1206
1207   /* Get initial size of sampler/gossip list from the configuration */
1208   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
1209                                                          "INITSIZE",
1210                                                          (long long unsigned int *) &sampler_size))
1211   {
1212     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
1213     GNUNET_SCHEDULER_shutdown ();
1214     return;
1215   }
1216   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size);
1217
1218   //gossip_list_size = sampler_size; // TODO rename sampler_size
1219
1220   gossip_list = NULL;
1221   GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size);
1222
1223
1224   /* connect to NSE */
1225   nse = GNUNET_NSE_connect(cfg, nse_callback, NULL);
1226   // TODO check whether that was successful
1227   // TODO disconnect on shutdown
1228   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
1229
1230
1231   alpha = 0.45;
1232   beta  = 0.45;
1233   // TODO initialise thresholds - ?
1234
1235   /* Get alpha from the configuration */
1236   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1237                                                          "ALPHA",
1238                                                          &alpha))
1239   {
1240     LOG(GNUNET_ERROR_TYPE_DEBUG, "No ALPHA specified in the config\n");
1241   }
1242   LOG(GNUNET_ERROR_TYPE_DEBUG, "ALPHA is %f\n", alpha);
1243  
1244   /* Get beta from the configuration */
1245   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1246                                                          "BETA",
1247                                                          &beta))
1248   {
1249     LOG (GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
1250   }
1251   LOG (GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
1252
1253   // TODO check that alpha + beta < 1
1254
1255   peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size, GNUNET_NO);
1256
1257
1258   /* Initialise cadet */
1259   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1260     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        , 0},
1261     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
1262     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
1263     {NULL, 0, 0}
1264   };
1265
1266   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
1267   cadet_handle = GNUNET_CADET_connect (cfg,
1268                                     cls,
1269                                     &handle_inbound_channel,
1270                                     &cleanup_channel,
1271                                     cadet_handlers,
1272                                     ports);
1273   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
1274
1275
1276   /* Initialise sampler */
1277   RPS_sampler_init (sampler_size, own_identity, insertCB, NULL, removeCB, NULL);
1278
1279   /* Initialise push and pull maps */
1280   push_list = NULL;
1281   push_list_size = 0;
1282   pull_list = NULL;
1283   pull_list_size = 0;
1284
1285
1286   ipc = GNUNET_new (struct init_peer_cls);
1287   ipc->server = server;
1288   ipc->i = 0;
1289   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
1290   GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc);
1291
1292   // TODO send push/pull to each of those peers?
1293 }
1294
1295
1296 /**
1297  * The main function for the rps service.
1298  *
1299  * @param argc number of arguments from the command line
1300  * @param argv command line arguments
1301  * @return 0 ok, 1 on error
1302  */
1303 int
1304 main (int argc, char *const *argv)
1305 {
1306   return (GNUNET_OK ==
1307           GNUNET_SERVICE_run (argc,
1308                               argv,
1309                               "rps",
1310                               GNUNET_SERVICE_OPTION_NONE,
1311                               &run, NULL)) ? 0 : 1;
1312 }
1313
1314 /* end of gnunet-service-rps.c */