tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / rps / rps_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/rps_api.c
23  * @brief API for rps
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29 #include "gnunet_rps_service.h"
30 #include "rps-sampler_client.h"
31
32 #include "gnunet_nse_service.h"
33
34 #include <inttypes.h>
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
37
38 /**
39  * Handle for a request to get peers from biased stream of ids
40  */
41 struct GNUNET_RPS_StreamRequestHandle
42 {
43   /**
44    * The client issuing the request.
45    */
46   struct GNUNET_RPS_Handle *rps_handle;
47
48   /**
49    * The callback to be called when we receive an answer.
50    */
51   GNUNET_RPS_NotifyReadyCB ready_cb;
52
53   /**
54    * The closure for the callback.
55    */
56   void *ready_cb_cls;
57
58   /**
59    * @brief Scheduler task for scheduled callback
60    */
61   struct GNUNET_SCHEDULER_Task *callback_task;
62
63   /**
64    * @brief Next element of the DLL
65    */
66   struct GNUNET_RPS_StreamRequestHandle *next;
67
68   /**
69    * @brief Previous element of the DLL
70    */
71   struct GNUNET_RPS_StreamRequestHandle *prev;
72 };
73
74
75 /**
76  * Handler to handle requests from a client.
77  */
78 struct GNUNET_RPS_Handle
79 {
80   /**
81    * The handle to the client configuration.
82    */
83   const struct GNUNET_CONFIGURATION_Handle *cfg;
84
85   /**
86    * The message queue to the client.
87    */
88   struct GNUNET_MQ_Handle *mq;
89
90   /**
91    * @brief Callback called on each update of the view
92    */
93   GNUNET_RPS_NotifyReadyCB view_update_cb;
94
95   /**
96    * @brief Closure to each requested update of the view
97    */
98   void *view_update_cls;
99
100   /**
101    * @brief Closure to each requested peer from the biased stream
102    */
103   void *stream_input_cls;
104
105   /**
106    * @brief Head of the DLL of stream requests
107    */
108   struct GNUNET_RPS_StreamRequestHandle *stream_requests_head;
109
110   /**
111    * @brief Tail of the DLL of stream requests
112    */
113   struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
114
115   /**
116    * @brief Handle to nse service
117    */
118   struct GNUNET_NSE_Handle *nse;
119
120   /**
121    * @brief Pointer to the head element in DLL of request handles
122    */
123   struct GNUNET_RPS_Request_Handle *rh_head;
124
125   /**
126    * @brief Pointer to the tail element in DLL of request handles
127    */
128   struct GNUNET_RPS_Request_Handle *rh_tail;
129
130   /**
131    * @brief Pointer to the head element in DLL of single request handles
132    */
133   struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head;
134
135   /**
136    * @brief Pointer to the tail element in DLL of single request handles
137    */
138   struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail;
139
140   /**
141    * @brief The desired probability with which we want to have observed all
142    * peers.
143    */
144   float desired_probability;
145
146   /**
147    * @brief A factor that catches the 'bias' of a random stream of peer ids.
148    *
149    * As introduced by Brahms: Factor between the number of unique ids in a
150    * truly random stream and number of unique ids in the gossip stream.
151    */
152   float deficiency_factor;
153 };
154
155
156 /**
157  * Handler for a single request from a client.
158  */
159 struct GNUNET_RPS_Request_Handle
160 {
161   /**
162    * The client issuing the request.
163    */
164   struct GNUNET_RPS_Handle *rps_handle;
165
166   /**
167    * The number of requested peers.
168    */
169   uint32_t num_requests;
170
171   /**
172    * @brief The Sampler for the client request
173    */
174   struct RPS_Sampler *sampler;
175
176   /**
177    * @brief Request handle of the request to the sampler - needed to cancel the request
178    */
179   struct RPS_SamplerRequestHandle *sampler_rh;
180
181   /**
182    * @brief Request handle of the request of the biased stream of peers -
183    * needed to cancel the request
184    */
185   struct GNUNET_RPS_StreamRequestHandle *srh;
186
187   /**
188    * The callback to be called when we receive an answer.
189    */
190   GNUNET_RPS_NotifyReadyCB ready_cb;
191
192   /**
193    * The closure for the callback.
194    */
195   void *ready_cb_cls;
196
197   /**
198    * @brief Pointer to next element in DLL
199    */
200   struct GNUNET_RPS_Request_Handle *next;
201
202   /**
203    * @brief Pointer to previous element in DLL
204    */
205   struct GNUNET_RPS_Request_Handle *prev;
206 };
207
208
209 /**
210  * Handler for a single request from a client.
211  */
212 struct GNUNET_RPS_Request_Handle_Single_Info
213 {
214   /**
215    * The client issuing the request.
216    */
217   struct GNUNET_RPS_Handle *rps_handle;
218
219   /**
220    * @brief The Sampler for the client request
221    */
222   struct RPS_Sampler *sampler;
223
224   /**
225    * @brief Request handle of the request to the sampler - needed to cancel the request
226    */
227   struct RPS_SamplerRequestHandleSingleInfo *sampler_rh;
228
229   /**
230    * @brief Request handle of the request of the biased stream of peers -
231    * needed to cancel the request
232    */
233   struct GNUNET_RPS_StreamRequestHandle *srh;
234
235   /**
236    * The callback to be called when we receive an answer.
237    */
238   GNUNET_RPS_NotifyReadySingleInfoCB ready_cb;
239
240   /**
241    * The closure for the callback.
242    */
243   void *ready_cb_cls;
244
245   /**
246    * @brief Pointer to next element in DLL
247    */
248   struct GNUNET_RPS_Request_Handle_Single_Info *next;
249
250   /**
251    * @brief Pointer to previous element in DLL
252    */
253   struct GNUNET_RPS_Request_Handle_Single_Info *prev;
254 };
255
256
257 /**
258  * Struct used to pack the callback, its closure (provided by the caller)
259  * and the connection handler to the service to pass it to a callback function.
260  */
261 struct cb_cls_pack
262 {
263   /**
264    * Callback provided by the client
265    */
266   GNUNET_RPS_NotifyReadyCB cb;
267
268   /**
269    * Closure provided by the client
270    */
271   void *cls;
272
273   /**
274    * Handle to the service connection
275    */
276  struct GNUNET_CLIENT_Connection *service_conn;
277 };
278
279
280 /**
281  * @brief Peers received from the biased stream to be passed to all
282  * srh_handlers
283  */
284 static struct GNUNET_PeerIdentity *srh_callback_peers;
285
286 /**
287  * @brief Number of peers in the biased stream that are to be passed to all
288  * srh_handlers
289  */
290 static uint64_t srh_callback_num_peers;
291
292
293 /**
294  * @brief Create a new handle for a stream request
295  *
296  * @param rps_handle The rps handle
297  * @param num_peers The number of desired peers
298  * @param ready_cb The callback to be called, once all peers are ready
299  * @param cls The colsure to provide to the callback
300  *
301  * @return The handle to the stream request
302  */
303 static struct GNUNET_RPS_StreamRequestHandle *
304 new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
305                     GNUNET_RPS_NotifyReadyCB ready_cb,
306                     void *cls)
307 {
308   struct GNUNET_RPS_StreamRequestHandle *srh;
309
310   srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
311   srh->rps_handle = rps_handle;
312   srh->ready_cb = ready_cb;
313   srh->ready_cb_cls = cls;
314   GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
315                                rps_handle->stream_requests_tail,
316                                srh);
317
318   return srh;
319 }
320
321
322 /**
323  * @brief Remove the given stream request from the list of requests and memory
324  *
325  * @param srh The request to be removed
326  */
327 static void
328 remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh)
329 {
330   struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
331
332   GNUNET_assert (NULL != srh);
333   if (NULL != srh->callback_task)
334   {
335     GNUNET_SCHEDULER_cancel (srh->callback_task);
336     srh->callback_task = NULL;
337   }
338   GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
339                                rps_handle->stream_requests_tail,
340                                srh);
341   GNUNET_free (srh);
342 }
343
344
345 /**
346  * @brief Called once the sampler has collected all requested peers.
347  *
348  * Calls the callback provided by the client with the corresponding cls.
349  *
350  * @param peers The array of @a num_peers that has been returned.
351  * @param num_peers The number of peers that have been returned
352  * @param cls The #GNUNET_RPS_Request_Handle
353  */
354 static void
355 peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
356                 uint32_t num_peers,
357                 void *cls)
358 {
359   struct GNUNET_RPS_Request_Handle *rh = cls;
360
361   rh->sampler_rh = NULL;
362   rh->ready_cb (rh->ready_cb_cls,
363                 num_peers,
364                 peers);
365   GNUNET_RPS_request_cancel (rh);
366 }
367
368
369 /**
370  * @brief Called once the sampler has collected the requested peer.
371  *
372  * Calls the callback provided by the client with the corresponding cls.
373  *
374  * @param peers The array of @a num_peers that has been returned.
375  * @param num_peers The number of peers that have been returned
376  * @param cls The #GNUNET_RPS_Request_Handle
377  * @param probability Probability with which all IDs have been observed
378  * @param num_observed Number of observed IDs
379  */
380 static void
381 peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers,
382                     void *cls,
383                     double probability,
384                     uint32_t num_observed)
385 {
386   struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls;
387
388   rh->sampler_rh = NULL;
389   rh->ready_cb (rh->ready_cb_cls,
390                 peers,
391                 probability,
392                 num_observed);
393   GNUNET_RPS_request_single_info_cancel (rh);
394 }
395
396
397 /**
398  * @brief Callback to collect the peers from the biased stream and put those
399  * into the sampler.
400  *
401  * @param cls The #GNUNET_RPS_Request_Handle
402  * @param num_peers The number of peer that have been returned
403  * @param peers The array of @a num_peers that have been returned
404  */
405 static void
406 collect_peers_cb (void *cls,
407                   uint64_t num_peers,
408                   const struct GNUNET_PeerIdentity *peers)
409 {
410   struct GNUNET_RPS_Request_Handle *rh = cls;
411
412   LOG (GNUNET_ERROR_TYPE_DEBUG,
413        "Service sent %" PRIu64 " peers from stream\n",
414        num_peers);
415   for (uint64_t i = 0; i < num_peers; i++)
416   {
417     RPS_sampler_update (rh->sampler, &peers[i]);
418   }
419 }
420
421
422 /**
423  * @brief Callback to collect the peers from the biased stream and put those
424  * into the sampler.
425  *
426  * This version is for the modified #GNUNET_RPS_Request_Handle_Single_Info
427  *
428  * @param cls The #GNUNET_RPS_Request_Handle
429  * @param num_peers The number of peer that have been returned
430  * @param peers The array of @a num_peers that have been returned
431  */
432 static void
433 collect_peers_info_cb (void *cls,
434                        uint64_t num_peers,
435                        const struct GNUNET_PeerIdentity *peers)
436 {
437   struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls;
438
439   LOG (GNUNET_ERROR_TYPE_DEBUG,
440        "Service sent %" PRIu64 " peers from stream\n",
441        num_peers);
442   for (uint64_t i = 0; i < num_peers; i++)
443   {
444     RPS_sampler_update (rhs->sampler, &peers[i]);
445   }
446 }
447
448
449 /* Get internals for debugging/profiling purposes */
450
451 /**
452  * Request updates of view
453  *
454  * @param rps_handle handle to the rps service
455  * @param num_req_peers number of peers we want to receive
456  *        (0 for infinite updates)
457  * @param cls a closure that will be given to the callback
458  * @param ready_cb the callback called when the peers are available
459  */
460 void
461 GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
462                          uint32_t num_updates,
463                          GNUNET_RPS_NotifyReadyCB view_update_cb,
464                          void *cls)
465 {
466   struct GNUNET_MQ_Envelope *ev;
467   struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg;
468
469   LOG (GNUNET_ERROR_TYPE_DEBUG,
470        "Client requests %" PRIu32 " view updates\n",
471        num_updates);
472   rps_handle->view_update_cb = view_update_cb;
473   rps_handle->view_update_cls = cls;
474
475   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST);
476   msg->num_updates = htonl (num_updates);
477   GNUNET_MQ_send (rps_handle->mq, ev);
478 }
479
480
481 void
482 GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle)
483 {
484   struct GNUNET_MQ_Envelope *ev;
485
486   GNUNET_assert (NULL != rps_handle->view_update_cb);
487
488   rps_handle->view_update_cb = NULL;
489
490   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL);
491   GNUNET_MQ_send (rps_handle->mq, ev);
492 }
493
494
495 /**
496  * Request biased stream of peers that are being put into the sampler
497  *
498  * @param rps_handle handle to the rps service
499  * @param cls a closure that will be given to the callback
500  * @param ready_cb the callback called when the peers are available
501  */
502 struct GNUNET_RPS_StreamRequestHandle *
503 GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
504                            GNUNET_RPS_NotifyReadyCB stream_input_cb,
505                            void *cls)
506 {
507   struct GNUNET_RPS_StreamRequestHandle *srh;
508   struct GNUNET_MQ_Envelope *ev;
509   struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
510
511   srh = new_stream_request (rps_handle,
512                             stream_input_cb,
513                             cls);
514   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
515
516   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
517   GNUNET_MQ_send (rps_handle->mq, ev);
518   return srh;
519 }
520
521
522 /**
523  * This function is called, when the service updates the view.
524  * It verifies that @a msg is well-formed.
525  *
526  * @param cls the closure
527  * @param msg the message
528  * @return #GNUNET_OK if @a msg is well-formed
529  */
530 static int
531 check_view_update (void *cls,
532                    const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
533 {
534   uint16_t msize = ntohs (msg->header.size);
535   uint32_t num_peers = ntohl (msg->num_peers);
536   (void) cls;
537
538   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_ViewReply);
539   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
540        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
541   {
542     GNUNET_break (0);
543     return GNUNET_SYSERR;
544   }
545   return GNUNET_OK;
546 }
547
548
549 /**
550  * This function is called, when the service updated its view.
551  * It calls the callback the caller provided
552  * and disconnects afterwards.
553  *
554  * @param msg the message
555  */
556 static void
557 handle_view_update (void *cls,
558                     const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
559 {
560   struct GNUNET_RPS_Handle *h = cls;
561   struct GNUNET_PeerIdentity *peers;
562
563   /* Give the peers back */
564   LOG (GNUNET_ERROR_TYPE_DEBUG,
565        "New view of %" PRIu32 " peers:\n",
566        ntohl (msg->num_peers));
567
568   peers = (struct GNUNET_PeerIdentity *) &msg[1];
569   GNUNET_assert (NULL != h);
570   GNUNET_assert (NULL != h->view_update_cb);
571   h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers);
572 }
573
574
575 /**
576  * @brief Send message to service that this client does not want to receive
577  * further updates from the biased peer stream
578  *
579  * @param rps_handle The handle representing the service to the client
580  */
581 static void
582 cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
583 {
584   struct GNUNET_MQ_Envelope *ev;
585
586   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
587   GNUNET_MQ_send (rps_handle->mq, ev);
588 }
589
590
591 /**
592  * @brief Cancel a specific request for updates from the biased peer stream
593  *
594  * @param srh The request handle to cancel
595  */
596 void
597 GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
598 {
599   struct GNUNET_RPS_Handle *rps_handle;
600
601   rps_handle = srh->rps_handle;
602   remove_stream_request (srh);
603   if (NULL == rps_handle->stream_requests_head)
604     cancel_stream (rps_handle);
605 }
606
607
608 /**
609  * This function is called, when the service sends another peer from the biased
610  * stream.
611  * It calls the callback the caller provided
612  * and disconnects afterwards.
613  *
614  * TODO merge with check_view_update
615  *
616  * @param msg the message
617  */
618 static int
619 check_stream_input (void *cls,
620                     const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
621 {
622   uint16_t msize = ntohs (msg->header.size);
623   uint32_t num_peers = ntohl (msg->num_peers);
624   (void) cls;
625
626   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
627   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
628        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
629   {
630     GNUNET_break (0);
631     return GNUNET_SYSERR;
632   }
633   return GNUNET_OK;
634 }
635
636
637 /**
638  * @brief Called by the scheduler to call the callbacks of the srh handlers
639  *
640  * @param cls Stream request handle
641  */
642 static void
643 srh_callback_scheduled (void *cls)
644 {
645   struct GNUNET_RPS_StreamRequestHandle *srh = cls;
646
647   srh->callback_task = NULL;
648   srh->ready_cb (srh->ready_cb_cls,
649                  srh_callback_num_peers,
650                  srh_callback_peers);
651 }
652
653
654 /**
655  * This function is called, when the service sends another peer from the biased
656  * stream.
657  * It calls the callback the caller provided
658  * and disconnects afterwards.
659  *
660  * @param msg the message
661  */
662 static void
663 handle_stream_input (void *cls,
664                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
665 {
666   struct GNUNET_RPS_Handle *h = cls;
667   //const struct GNUNET_PeerIdentity *peers;
668   uint64_t num_peers;
669   struct GNUNET_RPS_StreamRequestHandle *srh_iter;
670   struct GNUNET_RPS_StreamRequestHandle *srh_next;
671
672   //peers = (struct GNUNET_PeerIdentity *) &msg[1];
673   num_peers = ntohl (msg->num_peers);
674   srh_callback_num_peers = num_peers;
675   GNUNET_free_non_null (srh_callback_peers);
676   srh_callback_peers = GNUNET_new_array (num_peers,
677                                          struct GNUNET_PeerIdentity);
678   GNUNET_memcpy (srh_callback_peers,
679                  &msg[1],
680                  num_peers * sizeof (struct GNUNET_PeerIdentity));
681   LOG (GNUNET_ERROR_TYPE_DEBUG,
682        "Received %" PRIu64 " peer(s) from stream input.\n",
683        num_peers);
684   for (srh_iter = h->stream_requests_head;
685        NULL != srh_iter;
686        srh_iter = srh_next)
687   {
688     LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
689     /* Store next pointer - srh might be removed/freed in callback */
690     srh_next = srh_iter->next;
691     if (NULL != srh_iter->callback_task)
692       GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
693     srh_iter->callback_task =
694       GNUNET_SCHEDULER_add_now (&srh_callback_scheduled,
695                                 srh_iter);
696   }
697
698   if (NULL == h->stream_requests_head)
699   {
700     cancel_stream (h);
701   }
702 }
703
704
705 /**
706  * Reconnect to the service
707  */
708 static void
709 reconnect (struct GNUNET_RPS_Handle *h);
710
711
712 /**
713  * Error handler for mq.
714  *
715  * This function is called whan mq encounters an error.
716  * Until now mq doesn't provide useful error messages.
717  *
718  * @param cls the closure
719  * @param error error code without specyfied meaning
720  */
721 static void
722 mq_error_handler (void *cls,
723                   enum GNUNET_MQ_Error error)
724 {
725   struct GNUNET_RPS_Handle *h = cls;
726   //TODO LOG
727   LOG (GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\
728        1: READ,\n\
729        2: WRITE,\n\
730        4: TIMEOUT\n",
731        // TODO: write GNUNET_MQ_strerror (error)
732        error);
733   reconnect (h);
734   /* Resend all pending request as the service destroyed its knowledge
735    * about them */
736 }
737
738
739 /**
740  * @brief Create the hash value from the share value that defines the sub
741  * (-group)
742  *
743  * @param share_val Share value
744  * @param hash[out] Pointer to the location in which the hash will be stored.
745  */
746 static void
747 hash_from_share_val (const char *share_val,
748                      struct GNUNET_HashCode *hash)
749 {
750   GNUNET_CRYPTO_kdf (hash,
751                      sizeof (struct GNUNET_HashCode),
752                      "rps",
753                      strlen ("rps"),
754                      share_val,
755                      strlen (share_val),
756                      NULL, 0);
757 }
758
759
760 /**
761  * @brief Callback for network size estimate - called with new estimates about
762  * the network size, updates all samplers with the new estimate
763  *
764  * Implements #GNUNET_NSE_Callback
765  *
766  * @param cls the rps handle
767  * @param timestamp unused
768  * @param logestimate the estimate
769  * @param std_dev the standard distribution
770  */
771 static void
772 nse_cb (void *cls,
773         struct GNUNET_TIME_Absolute timestamp,
774         double logestimate,
775         double std_dev)
776 {
777   struct GNUNET_RPS_Handle *h = cls;
778   (void) timestamp;
779   (void) std_dev;
780
781   for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
782        NULL != rh_iter && NULL != rh_iter->next;
783        rh_iter = rh_iter->next)
784   {
785     RPS_sampler_update_with_nw_size (rh_iter->sampler,
786                                      GNUNET_NSE_log_estimate_to_n (logestimate));
787   }
788   for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
789        NULL != rhs_iter && NULL != rhs_iter->next;
790        rhs_iter = rhs_iter->next)
791   {
792     RPS_sampler_update_with_nw_size (rhs_iter->sampler,
793                                      GNUNET_NSE_log_estimate_to_n (logestimate));
794   }
795 }
796
797
798 /**
799  * Reconnect to the service
800  */
801 static void
802 reconnect (struct GNUNET_RPS_Handle *h)
803 {
804   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
805     GNUNET_MQ_hd_var_size (view_update,
806                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
807                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
808                            h),
809     GNUNET_MQ_hd_var_size (stream_input,
810                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
811                            struct GNUNET_RPS_CS_DEBUG_StreamReply,
812                            h),
813     GNUNET_MQ_handler_end ()
814   };
815
816   if (NULL != h->mq)
817     GNUNET_MQ_destroy (h->mq);
818   h->mq = GNUNET_CLIENT_connect (h->cfg,
819                                  "rps",
820                                  mq_handlers,
821                                  &mq_error_handler,
822                                  h);
823   if (NULL != h->nse)
824     GNUNET_NSE_disconnect (h->nse);
825   h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h);
826 }
827
828
829 /**
830  * Connect to the rps service
831  *
832  * @param cfg configuration to use
833  * @return a handle to the service, NULL on error
834  */
835 struct GNUNET_RPS_Handle *
836 GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
837 {
838   struct GNUNET_RPS_Handle *h;
839
840   h = GNUNET_new (struct GNUNET_RPS_Handle);
841   h->cfg = cfg;
842   if (GNUNET_OK !=
843       GNUNET_CONFIGURATION_get_value_float (cfg,
844                                            "RPS",
845                                            "DESIRED_PROBABILITY",
846                                            &h->desired_probability))
847   {
848     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
849                                "RPS", "DESIRED_PROBABILITY");
850     GNUNET_free (h);
851     return NULL;
852   }
853   if (0 > h->desired_probability ||
854       1 < h->desired_probability)
855   {
856     LOG (GNUNET_ERROR_TYPE_ERROR,
857         "The desired probability must be in the interval [0;1]\n");
858     GNUNET_free (h);
859     return NULL;
860   }
861   if (GNUNET_OK !=
862       GNUNET_CONFIGURATION_get_value_float (cfg,
863                                            "RPS",
864                                            "DEFICIENCY_FACTOR",
865                                            &h->deficiency_factor))
866   {
867     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
868                                "RPS", "DEFICIENCY_FACTOR");
869     GNUNET_free (h);
870     return NULL;
871   }
872   if (0 > h->desired_probability ||
873       1 < h->desired_probability)
874   {
875     LOG (GNUNET_ERROR_TYPE_ERROR,
876         "The deficiency factor must be in the interval [0;1]\n");
877     GNUNET_free (h);
878     return NULL;
879   }
880   reconnect (h);
881   if (NULL == h->mq)
882   {
883     GNUNET_free (h);
884     return NULL;
885   }
886   return h;
887 }
888
889
890 /**
891  * @brief Start a sub with the given shared value
892  *
893  * @param h Handle to rps
894  * @param shared_value The shared value that defines the members of the sub (-gorup)
895  */
896 void
897 GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
898                       const char *shared_value)
899 {
900   struct GNUNET_RPS_CS_SubStartMessage *msg;
901   struct GNUNET_MQ_Envelope *ev;
902
903   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
904   hash_from_share_val (shared_value, &msg->hash);
905   msg->round_interval = GNUNET_TIME_relative_hton (// TODO read from config!
906     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30));
907   GNUNET_assert (0 != msg->round_interval.rel_value_us__);
908
909   GNUNET_MQ_send (h->mq, ev);
910 }
911
912
913 /**
914  * @brief Stop a sub with the given shared value
915  *
916  * @param h Handle to rps
917  * @param shared_value The shared value that defines the members of the sub (-gorup)
918  */
919 void
920 GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
921                      const char *shared_value)
922 {
923   struct GNUNET_RPS_CS_SubStopMessage *msg;
924   struct GNUNET_MQ_Envelope *ev;
925
926   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
927   hash_from_share_val (shared_value, &msg->hash);
928
929   GNUNET_MQ_send (h->mq, ev);
930 }
931
932
933 /**
934  * Request n random peers.
935  *
936  * @param rps_handle handle to the rps service
937  * @param num_req_peers number of peers we want to receive
938  * @param ready_cb the callback called when the peers are available
939  * @param cls closure given to the callback
940  * @return a handle to cancel this request
941  */
942 struct GNUNET_RPS_Request_Handle *
943 GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
944                           uint32_t num_req_peers,
945                           GNUNET_RPS_NotifyReadyCB ready_cb,
946                           void *cls)
947 {
948   struct GNUNET_RPS_Request_Handle *rh;
949
950   LOG (GNUNET_ERROR_TYPE_INFO,
951        "Client requested %" PRIu32 " peers\n",
952        num_req_peers);
953   rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
954   rh->rps_handle = rps_handle;
955   rh->num_requests = num_req_peers;
956   rh->sampler = RPS_sampler_mod_init (num_req_peers,
957                                       GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
958   RPS_sampler_set_desired_probability (rh->sampler,
959                                        rps_handle->desired_probability);
960   RPS_sampler_set_deficiency_factor (rh->sampler,
961                                      rps_handle->deficiency_factor);
962   rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
963                                                  num_req_peers,
964                                                  peers_ready_cb,
965                                                  rh);
966   rh->srh = GNUNET_RPS_stream_request (rps_handle,
967                                        collect_peers_cb,
968                                        rh); /* cls */
969   rh->ready_cb = ready_cb;
970   rh->ready_cb_cls = cls;
971   GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head,
972                                rps_handle->rh_tail,
973                                rh);
974
975   return rh;
976 }
977
978
979 /**
980  * Request one random peer, getting additional information.
981  *
982  * @param rps_handle handle to the rps service
983  * @param ready_cb the callback called when the peers are available
984  * @param cls closure given to the callback
985  * @return a handle to cancel this request
986  */
987 struct GNUNET_RPS_Request_Handle_Single_Info *
988 GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle,
989                               GNUNET_RPS_NotifyReadySingleInfoCB ready_cb,
990                               void *cls)
991 {
992   struct GNUNET_RPS_Request_Handle_Single_Info *rhs;
993   uint32_t num_req_peers = 1;
994
995   LOG (GNUNET_ERROR_TYPE_INFO,
996        "Client requested peer with additional info\n");
997   rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info);
998   rhs->rps_handle = rps_handle;
999   rhs->sampler = RPS_sampler_mod_init (num_req_peers,
1000                                       GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
1001   RPS_sampler_set_desired_probability (rhs->sampler,
1002                                        rps_handle->desired_probability);
1003   RPS_sampler_set_deficiency_factor (rhs->sampler,
1004                                      rps_handle->deficiency_factor);
1005   rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler,
1006                                                    peer_info_ready_cb,
1007                                                    rhs);
1008   rhs->srh = GNUNET_RPS_stream_request (rps_handle,
1009                                        collect_peers_info_cb,
1010                                        rhs); /* cls */
1011   rhs->ready_cb = ready_cb;
1012   rhs->ready_cb_cls = cls;
1013   GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head,
1014                                rps_handle->rhs_tail,
1015                                rhs);
1016
1017   return rhs;
1018 }
1019
1020
1021 /**
1022  * Seed rps service with peerIDs.
1023  *
1024  * @param h handle to the rps service
1025  * @param n number of peers to seed
1026  * @param ids the ids of the peers seeded
1027  */
1028 void
1029 GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
1030                      uint32_t n,
1031                      const struct GNUNET_PeerIdentity *ids)
1032 {
1033   size_t size_needed;
1034   uint32_t num_peers_max;
1035   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
1036   struct GNUNET_MQ_Envelope *ev;
1037   struct GNUNET_RPS_CS_SeedMessage *msg;
1038
1039   LOG (GNUNET_ERROR_TYPE_DEBUG,
1040        "Client wants to seed %" PRIu32 " peers:\n",
1041        n);
1042   for (unsigned int i = 0 ; i < n ; i++)
1043     LOG (GNUNET_ERROR_TYPE_DEBUG,
1044          "%u. peer: %s\n",
1045          i,
1046          GNUNET_i2s (&ids[i]));
1047
1048   /* The actual size the message occupies */
1049   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
1050     n * sizeof (struct GNUNET_PeerIdentity);
1051   /* The number of peers that fits in one message together with
1052    * the respective header */
1053   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
1054       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
1055     sizeof (struct GNUNET_PeerIdentity);
1056   tmp_peer_pointer = ids;
1057
1058   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1059   {
1060     ev = GNUNET_MQ_msg_extra (msg,
1061                               num_peers_max * sizeof (struct GNUNET_PeerIdentity),
1062                               GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1063     msg->num_peers = htonl (num_peers_max);
1064     GNUNET_memcpy (&msg[1],
1065                    tmp_peer_pointer,
1066                    num_peers_max * sizeof (struct GNUNET_PeerIdentity));
1067     GNUNET_MQ_send (h->mq,
1068                     ev);
1069     n -= num_peers_max;
1070     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
1071                   n * sizeof (struct GNUNET_PeerIdentity);
1072     /* Set pointer to beginning of next block of num_peers_max peers */
1073     tmp_peer_pointer = &ids[num_peers_max];
1074   }
1075
1076   ev = GNUNET_MQ_msg_extra (msg,
1077                             n * sizeof (struct GNUNET_PeerIdentity),
1078                             GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1079   msg->num_peers = htonl (n);
1080   GNUNET_memcpy (&msg[1],
1081                  tmp_peer_pointer,
1082                  n * sizeof (struct GNUNET_PeerIdentity));
1083   GNUNET_MQ_send (h->mq,
1084                   ev);
1085 }
1086
1087
1088 #if ENABLE_MALICIOUS
1089 /**
1090  * Turn RPS service to act malicious.
1091  *
1092  * @param h handle to the rps service
1093  * @param type which type of malicious peer to turn to.
1094  *             0 Don't act malicious at all
1095  *             1 Try to maximise representation
1096  *             2 Try to partition the network
1097  *               (isolate one peer from the rest)
1098  * @param n number of @a ids
1099  * @param ids the ids of the malicious peers
1100  *            if @type is 2 the last id is the id of the
1101  *            peer to be isolated from the rest
1102  */
1103 void
1104 GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
1105                           uint32_t type,
1106                           uint32_t num_peers,
1107                           const struct GNUNET_PeerIdentity *peer_ids,
1108                           const struct GNUNET_PeerIdentity *target_peer)
1109 {
1110   size_t size_needed;
1111   uint32_t num_peers_max;
1112   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
1113   struct GNUNET_MQ_Envelope *ev;
1114   struct GNUNET_RPS_CS_ActMaliciousMessage *msg;
1115
1116   unsigned int i;
1117
1118   LOG (GNUNET_ERROR_TYPE_DEBUG,
1119        "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n",
1120        type,
1121        num_peers);
1122   for (i = 0 ; i < num_peers ; i++)
1123     LOG (GNUNET_ERROR_TYPE_DEBUG,
1124          "%u. peer: %s\n",
1125          i,
1126          GNUNET_i2s (&peer_ids[i]));
1127
1128   /* The actual size the message would occupy */
1129   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
1130     num_peers * sizeof (struct GNUNET_PeerIdentity);
1131   /* The number of peers that fit in one message together with
1132    * the respective header */
1133   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
1134       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
1135     sizeof (struct GNUNET_PeerIdentity);
1136   tmp_peer_pointer = peer_ids;
1137
1138   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1139   {
1140     LOG (GNUNET_ERROR_TYPE_DEBUG,
1141          "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n",
1142          num_peers_max);
1143     ev = GNUNET_MQ_msg_extra (msg,
1144                               num_peers_max * sizeof (struct GNUNET_PeerIdentity),
1145                               GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1146     msg->type = htonl (type);
1147     msg->num_peers = htonl (num_peers_max);
1148     if ( (2 == type) ||
1149          (3 == type) )
1150       msg->attacked_peer = peer_ids[num_peers];
1151     GNUNET_memcpy (&msg[1],
1152             tmp_peer_pointer,
1153             num_peers_max * sizeof (struct GNUNET_PeerIdentity));
1154
1155     GNUNET_MQ_send (h->mq, ev);
1156
1157     num_peers -= num_peers_max;
1158     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
1159                   num_peers * sizeof (struct GNUNET_PeerIdentity);
1160     /* Set pointer to beginning of next block of num_peers_max peers */
1161     tmp_peer_pointer = &peer_ids[num_peers_max];
1162   }
1163
1164   ev = GNUNET_MQ_msg_extra (msg,
1165                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1166                             GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1167   msg->type = htonl (type);
1168   msg->num_peers = htonl (num_peers);
1169   if ( (2 == type) ||
1170        (3 == type) )
1171     msg->attacked_peer = *target_peer;
1172   GNUNET_memcpy (&msg[1],
1173                  tmp_peer_pointer,
1174                  num_peers * sizeof (struct GNUNET_PeerIdentity));
1175
1176   GNUNET_MQ_send (h->mq, ev);
1177 }
1178 #endif /* ENABLE_MALICIOUS */
1179
1180
1181 /**
1182  * Cancle an issued request.
1183  *
1184  * @param rh request handle of request to cancle
1185  */
1186 void
1187 GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1188 {
1189   struct GNUNET_RPS_Handle *h;
1190
1191   h = rh->rps_handle;
1192   GNUNET_assert (NULL != rh);
1193   GNUNET_assert (NULL != rh->srh);
1194   GNUNET_assert (h == rh->srh->rps_handle);
1195   GNUNET_RPS_stream_cancel (rh->srh);
1196   rh->srh = NULL;
1197   if (NULL == h->stream_requests_head) cancel_stream(h);
1198   if (NULL != rh->sampler_rh)
1199   {
1200     RPS_sampler_request_cancel (rh->sampler_rh);
1201   }
1202   RPS_sampler_destroy (rh->sampler);
1203   rh->sampler = NULL;
1204   GNUNET_CONTAINER_DLL_remove (h->rh_head,
1205                                h->rh_tail,
1206                                rh);
1207   GNUNET_free (rh);
1208 }
1209
1210
1211 /**
1212  * Cancle an issued single info request.
1213  *
1214  * @param rhs request handle of request to cancle
1215  */
1216 void
1217 GNUNET_RPS_request_single_info_cancel (
1218     struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
1219 {
1220   struct GNUNET_RPS_Handle *h;
1221
1222   h = rhs->rps_handle;
1223   GNUNET_assert (NULL != rhs);
1224   GNUNET_assert (NULL != rhs->srh);
1225   GNUNET_assert (h == rhs->srh->rps_handle);
1226   GNUNET_RPS_stream_cancel (rhs->srh);
1227   rhs->srh = NULL;
1228   if (NULL == h->stream_requests_head) cancel_stream(h);
1229   if (NULL != rhs->sampler_rh)
1230   {
1231     RPS_sampler_request_single_info_cancel (rhs->sampler_rh);
1232   }
1233   RPS_sampler_destroy (rhs->sampler);
1234   rhs->sampler = NULL;
1235   GNUNET_CONTAINER_DLL_remove (h->rhs_head,
1236                                h->rhs_tail,
1237                                rhs);
1238   GNUNET_free (rhs);
1239 }
1240
1241
1242 /**
1243  * Disconnect from the rps service
1244  *
1245  * @param h the handle to the rps service
1246  */
1247 void
1248 GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1249 {
1250   if (NULL != h->stream_requests_head)
1251   {
1252     struct GNUNET_RPS_StreamRequestHandle *srh_next;
1253
1254     LOG (GNUNET_ERROR_TYPE_WARNING,
1255         "Still waiting for replies\n");
1256     for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
1257          NULL != srh_iter;
1258          srh_iter = srh_next)
1259     {
1260       srh_next = srh_iter->next;
1261       GNUNET_RPS_stream_cancel (srh_iter);
1262     }
1263   }
1264   if (NULL != h->rh_head)
1265   {
1266     LOG (GNUNET_ERROR_TYPE_WARNING,
1267          "Not all requests were cancelled!\n");
1268     for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1269          h->rh_head != NULL;
1270          rh_iter = h->rh_head)
1271     {
1272       GNUNET_RPS_request_cancel (rh_iter);
1273     }
1274   }
1275   if (NULL != h->rhs_head)
1276   {
1277     LOG (GNUNET_ERROR_TYPE_WARNING,
1278          "Not all requests were cancelled!\n");
1279     for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1280          h->rhs_head != NULL;
1281          rhs_iter = h->rhs_head)
1282     {
1283       GNUNET_RPS_request_single_info_cancel (rhs_iter);
1284     }
1285   }
1286   if (NULL != srh_callback_peers)
1287   {
1288     GNUNET_free (srh_callback_peers);
1289     srh_callback_peers = NULL;
1290   }
1291   if (NULL != h->view_update_cb)
1292   {
1293     LOG (GNUNET_ERROR_TYPE_WARNING,
1294         "Still waiting for view updates\n");
1295     GNUNET_RPS_view_request_cancel (h);
1296   }
1297   if (NULL != h->nse)
1298     GNUNET_NSE_disconnect (h->nse);
1299   GNUNET_MQ_destroy (h->mq);
1300   GNUNET_free (h);
1301 }
1302
1303
1304 /* end of rps_api.c */