error handling
[oweals/gnunet.git] / src / rps / rps_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file rps/rps_api.c
23  * @brief API for rps
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29 #include "gnunet_rps_service.h"
30 #include "rps-sampler_client.h"
31
32 #include "gnunet_nse_service.h"
33
34 #include <inttypes.h>
35
36 #define LOG(kind, ...) GNUNET_log_from (kind, "rps-api", __VA_ARGS__)
37
38 /**
39  * Handle for a request to get peers from biased stream of ids
40  */
41 struct GNUNET_RPS_StreamRequestHandle
42 {
43   /**
44    * The client issuing the request.
45    */
46   struct GNUNET_RPS_Handle *rps_handle;
47
48   /**
49    * The callback to be called when we receive an answer.
50    */
51   GNUNET_RPS_NotifyReadyCB ready_cb;
52
53   /**
54    * The closure for the callback.
55    */
56   void *ready_cb_cls;
57
58   /**
59    * @brief Scheduler task for scheduled callback
60    */
61   struct GNUNET_SCHEDULER_Task *callback_task;
62
63   /**
64    * @brief Next element of the DLL
65    */
66   struct GNUNET_RPS_StreamRequestHandle *next;
67
68   /**
69    * @brief Previous element of the DLL
70    */
71   struct GNUNET_RPS_StreamRequestHandle *prev;
72 };
73
74
75 /**
76  * Handler to handle requests from a client.
77  */
78 struct GNUNET_RPS_Handle
79 {
80   /**
81    * The handle to the client configuration.
82    */
83   const struct GNUNET_CONFIGURATION_Handle *cfg;
84
85   /**
86    * The message queue to the client.
87    */
88   struct GNUNET_MQ_Handle *mq;
89
90   /**
91    * @brief Callback called on each update of the view
92    */
93   GNUNET_RPS_NotifyReadyCB view_update_cb;
94
95   /**
96    * @brief Closure to each requested update of the view
97    */
98   void *view_update_cls;
99
100   /**
101    * @brief Closure to each requested peer from the biased stream
102    */
103   void *stream_input_cls;
104
105   /**
106    * @brief Head of the DLL of stream requests
107    */
108   struct GNUNET_RPS_StreamRequestHandle *stream_requests_head;
109
110   /**
111    * @brief Tail of the DLL of stream requests
112    */
113   struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
114
115   /**
116    * @brief Handle to nse service
117    */
118   struct GNUNET_NSE_Handle *nse;
119
120   /**
121    * @brief Pointer to the head element in DLL of request handles
122    */
123   struct GNUNET_RPS_Request_Handle *rh_head;
124
125   /**
126    * @brief Pointer to the tail element in DLL of request handles
127    */
128   struct GNUNET_RPS_Request_Handle *rh_tail;
129
130   /**
131    * @brief Pointer to the head element in DLL of single request handles
132    */
133   struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head;
134
135   /**
136    * @brief Pointer to the tail element in DLL of single request handles
137    */
138   struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail;
139
140   /**
141    * @brief The desired probability with which we want to have observed all
142    * peers.
143    */
144   float desired_probability;
145
146   /**
147    * @brief A factor that catches the 'bias' of a random stream of peer ids.
148    *
149    * As introduced by Brahms: Factor between the number of unique ids in a
150    * truly random stream and number of unique ids in the gossip stream.
151    */
152   float deficiency_factor;
153 };
154
155
156 /**
157  * Handler for a single request from a client.
158  */
159 struct GNUNET_RPS_Request_Handle
160 {
161   /**
162    * The client issuing the request.
163    */
164   struct GNUNET_RPS_Handle *rps_handle;
165
166   /**
167    * The number of requested peers.
168    */
169   uint32_t num_requests;
170
171   /**
172    * @brief The Sampler for the client request
173    */
174   struct RPS_Sampler *sampler;
175
176   /**
177    * @brief Request handle of the request to the sampler - needed to cancel the request
178    */
179   struct RPS_SamplerRequestHandle *sampler_rh;
180
181   /**
182    * @brief Request handle of the request of the biased stream of peers -
183    * needed to cancel the request
184    */
185   struct GNUNET_RPS_StreamRequestHandle *srh;
186
187   /**
188    * The callback to be called when we receive an answer.
189    */
190   GNUNET_RPS_NotifyReadyCB ready_cb;
191
192   /**
193    * The closure for the callback.
194    */
195   void *ready_cb_cls;
196
197   /**
198    * @brief Pointer to next element in DLL
199    */
200   struct GNUNET_RPS_Request_Handle *next;
201
202   /**
203    * @brief Pointer to previous element in DLL
204    */
205   struct GNUNET_RPS_Request_Handle *prev;
206 };
207
208
209 /**
210  * Handler for a single request from a client.
211  */
212 struct GNUNET_RPS_Request_Handle_Single_Info
213 {
214   /**
215    * The client issuing the request.
216    */
217   struct GNUNET_RPS_Handle *rps_handle;
218
219   /**
220    * @brief The Sampler for the client request
221    */
222   struct RPS_Sampler *sampler;
223
224   /**
225    * @brief Request handle of the request to the sampler - needed to cancel the request
226    */
227   struct RPS_SamplerRequestHandleSingleInfo *sampler_rh;
228
229   /**
230    * @brief Request handle of the request of the biased stream of peers -
231    * needed to cancel the request
232    */
233   struct GNUNET_RPS_StreamRequestHandle *srh;
234
235   /**
236    * The callback to be called when we receive an answer.
237    */
238   GNUNET_RPS_NotifyReadySingleInfoCB ready_cb;
239
240   /**
241    * The closure for the callback.
242    */
243   void *ready_cb_cls;
244
245   /**
246    * @brief Pointer to next element in DLL
247    */
248   struct GNUNET_RPS_Request_Handle_Single_Info *next;
249
250   /**
251    * @brief Pointer to previous element in DLL
252    */
253   struct GNUNET_RPS_Request_Handle_Single_Info *prev;
254 };
255
256
257 /**
258  * Struct used to pack the callback, its closure (provided by the caller)
259  * and the connection handler to the service to pass it to a callback function.
260  */
261 struct cb_cls_pack
262 {
263   /**
264    * Callback provided by the client
265    */
266   GNUNET_RPS_NotifyReadyCB cb;
267
268   /**
269    * Closure provided by the client
270    */
271   void *cls;
272
273   /**
274    * Handle to the service connection
275    */
276   struct GNUNET_CLIENT_Connection *service_conn;
277 };
278
279
280 /**
281  * @brief Peers received from the biased stream to be passed to all
282  * srh_handlers
283  */
284 static struct GNUNET_PeerIdentity *srh_callback_peers;
285
286 /**
287  * @brief Number of peers in the biased stream that are to be passed to all
288  * srh_handlers
289  */
290 static uint64_t srh_callback_num_peers;
291
292
293 /**
294  * @brief Create a new handle for a stream request
295  *
296  * @param rps_handle The rps handle
297  * @param num_peers The number of desired peers
298  * @param ready_cb The callback to be called, once all peers are ready
299  * @param cls The colsure to provide to the callback
300  *
301  * @return The handle to the stream request
302  */
303 static struct GNUNET_RPS_StreamRequestHandle *
304 new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
305                     GNUNET_RPS_NotifyReadyCB ready_cb,
306                     void *cls)
307 {
308   struct GNUNET_RPS_StreamRequestHandle *srh;
309
310   srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
311   srh->rps_handle = rps_handle;
312   srh->ready_cb = ready_cb;
313   srh->ready_cb_cls = cls;
314   GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
315                                rps_handle->stream_requests_tail,
316                                srh);
317
318   return srh;
319 }
320
321
322 /**
323  * @brief Remove the given stream request from the list of requests and memory
324  *
325  * @param srh The request to be removed
326  */
327 static void
328 remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh)
329 {
330   struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
331
332   GNUNET_assert (NULL != srh);
333   if (NULL != srh->callback_task)
334   {
335     GNUNET_SCHEDULER_cancel (srh->callback_task);
336     srh->callback_task = NULL;
337   }
338   GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
339                                rps_handle->stream_requests_tail,
340                                srh);
341   GNUNET_free (srh);
342 }
343
344
345 /**
346  * @brief Called once the sampler has collected all requested peers.
347  *
348  * Calls the callback provided by the client with the corresponding cls.
349  *
350  * @param peers The array of @a num_peers that has been returned.
351  * @param num_peers The number of peers that have been returned
352  * @param cls The #GNUNET_RPS_Request_Handle
353  */
354 static void
355 peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
356                 uint32_t num_peers,
357                 void *cls)
358 {
359   struct GNUNET_RPS_Request_Handle *rh = cls;
360
361   rh->sampler_rh = NULL;
362   rh->ready_cb (rh->ready_cb_cls,
363                 num_peers,
364                 peers);
365   GNUNET_RPS_request_cancel (rh);
366 }
367
368
369 /**
370  * @brief Called once the sampler has collected the requested peer.
371  *
372  * Calls the callback provided by the client with the corresponding cls.
373  *
374  * @param peers The array of @a num_peers that has been returned.
375  * @param num_peers The number of peers that have been returned
376  * @param cls The #GNUNET_RPS_Request_Handle
377  * @param probability Probability with which all IDs have been observed
378  * @param num_observed Number of observed IDs
379  */
380 static void
381 peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers,
382                     void *cls,
383                     double probability,
384                     uint32_t num_observed)
385 {
386   struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls;
387
388   rh->sampler_rh = NULL;
389   rh->ready_cb (rh->ready_cb_cls,
390                 peers,
391                 probability,
392                 num_observed);
393   GNUNET_RPS_request_single_info_cancel (rh);
394 }
395
396
397 /**
398  * @brief Callback to collect the peers from the biased stream and put those
399  * into the sampler.
400  *
401  * @param cls The #GNUNET_RPS_Request_Handle
402  * @param num_peers The number of peer that have been returned
403  * @param peers The array of @a num_peers that have been returned
404  */
405 static void
406 collect_peers_cb (void *cls,
407                   uint64_t num_peers,
408                   const struct GNUNET_PeerIdentity *peers)
409 {
410   struct GNUNET_RPS_Request_Handle *rh = cls;
411
412   LOG (GNUNET_ERROR_TYPE_DEBUG,
413        "Service sent %" PRIu64 " peers from stream\n",
414        num_peers);
415   for (uint64_t i = 0; i < num_peers; i++)
416   {
417     RPS_sampler_update (rh->sampler, &peers[i]);
418   }
419 }
420
421
422 /**
423  * @brief Callback to collect the peers from the biased stream and put those
424  * into the sampler.
425  *
426  * This version is for the modified #GNUNET_RPS_Request_Handle_Single_Info
427  *
428  * @param cls The #GNUNET_RPS_Request_Handle
429  * @param num_peers The number of peer that have been returned
430  * @param peers The array of @a num_peers that have been returned
431  */
432 static void
433 collect_peers_info_cb (void *cls,
434                        uint64_t num_peers,
435                        const struct GNUNET_PeerIdentity *peers)
436 {
437   struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls;
438
439   LOG (GNUNET_ERROR_TYPE_DEBUG,
440        "Service sent %" PRIu64 " peers from stream\n",
441        num_peers);
442   for (uint64_t i = 0; i < num_peers; i++)
443   {
444     RPS_sampler_update (rhs->sampler, &peers[i]);
445   }
446 }
447
448
449 /* Get internals for debugging/profiling purposes */
450
451 /**
452  * Request updates of view
453  *
454  * @param rps_handle handle to the rps service
455  * @param num_req_peers number of peers we want to receive
456  *        (0 for infinite updates)
457  * @param cls a closure that will be given to the callback
458  * @param ready_cb the callback called when the peers are available
459  */
460 void
461 GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
462                          uint32_t num_updates,
463                          GNUNET_RPS_NotifyReadyCB view_update_cb,
464                          void *cls)
465 {
466   struct GNUNET_MQ_Envelope *ev;
467   struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg;
468
469   LOG (GNUNET_ERROR_TYPE_DEBUG,
470        "Client requests %" PRIu32 " view updates\n",
471        num_updates);
472   rps_handle->view_update_cb = view_update_cb;
473   rps_handle->view_update_cls = cls;
474
475   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST);
476   msg->num_updates = htonl (num_updates);
477   GNUNET_MQ_send (rps_handle->mq, ev);
478 }
479
480
481 void
482 GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle)
483 {
484   struct GNUNET_MQ_Envelope *ev;
485
486   GNUNET_assert (NULL != rps_handle->view_update_cb);
487
488   rps_handle->view_update_cb = NULL;
489
490   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL);
491   GNUNET_MQ_send (rps_handle->mq, ev);
492 }
493
494
495 /**
496  * Request biased stream of peers that are being put into the sampler
497  *
498  * @param rps_handle handle to the rps service
499  * @param cls a closure that will be given to the callback
500  * @param ready_cb the callback called when the peers are available
501  */
502 struct GNUNET_RPS_StreamRequestHandle *
503 GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
504                            GNUNET_RPS_NotifyReadyCB stream_input_cb,
505                            void *cls)
506 {
507   struct GNUNET_RPS_StreamRequestHandle *srh;
508   struct GNUNET_MQ_Envelope *ev;
509   struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
510
511   srh = new_stream_request (rps_handle,
512                             stream_input_cb,
513                             cls);
514   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
515
516   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
517   GNUNET_MQ_send (rps_handle->mq, ev);
518   return srh;
519 }
520
521
522 /**
523  * This function is called, when the service updates the view.
524  * It verifies that @a msg is well-formed.
525  *
526  * @param cls the closure
527  * @param msg the message
528  * @return #GNUNET_OK if @a msg is well-formed
529  */
530 static int
531 check_view_update (void *cls,
532                    const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
533 {
534   uint16_t msize = ntohs (msg->header.size);
535   uint32_t num_peers = ntohl (msg->num_peers);
536
537   (void) cls;
538
539   msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_ViewReply);
540   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
541       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
542   {
543     GNUNET_break (0);
544     return GNUNET_SYSERR;
545   }
546   return GNUNET_OK;
547 }
548
549
550 /**
551  * This function is called, when the service updated its view.
552  * It calls the callback the caller provided
553  * and disconnects afterwards.
554  *
555  * @param msg the message
556  */
557 static void
558 handle_view_update (void *cls,
559                     const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
560 {
561   struct GNUNET_RPS_Handle *h = cls;
562   struct GNUNET_PeerIdentity *peers;
563
564   /* Give the peers back */
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "New view of %" PRIu32 " peers:\n",
567        ntohl (msg->num_peers));
568
569   peers = (struct GNUNET_PeerIdentity *) &msg[1];
570   GNUNET_assert (NULL != h);
571   GNUNET_assert (NULL != h->view_update_cb);
572   h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers);
573 }
574
575
576 /**
577  * @brief Send message to service that this client does not want to receive
578  * further updates from the biased peer stream
579  *
580  * @param rps_handle The handle representing the service to the client
581  */
582 static void
583 cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
584 {
585   struct GNUNET_MQ_Envelope *ev;
586
587   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
588   GNUNET_MQ_send (rps_handle->mq, ev);
589 }
590
591
592 /**
593  * @brief Cancel a specific request for updates from the biased peer stream
594  *
595  * @param srh The request handle to cancel
596  */
597 void
598 GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
599 {
600   struct GNUNET_RPS_Handle *rps_handle;
601
602   rps_handle = srh->rps_handle;
603   remove_stream_request (srh);
604   if (NULL == rps_handle->stream_requests_head)
605     cancel_stream (rps_handle);
606 }
607
608
609 /**
610  * This function is called, when the service sends another peer from the biased
611  * stream.
612  * It calls the callback the caller provided
613  * and disconnects afterwards.
614  *
615  * TODO merge with check_view_update
616  *
617  * @param msg the message
618  */
619 static int
620 check_stream_input (void *cls,
621                     const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
622 {
623   uint16_t msize = ntohs (msg->header.size);
624   uint32_t num_peers = ntohl (msg->num_peers);
625
626   (void) cls;
627
628   msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_StreamReply);
629   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
630       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
631   {
632     GNUNET_break (0);
633     return GNUNET_SYSERR;
634   }
635   return GNUNET_OK;
636 }
637
638
639 /**
640  * @brief Called by the scheduler to call the callbacks of the srh handlers
641  *
642  * @param cls Stream request handle
643  */
644 static void
645 srh_callback_scheduled (void *cls)
646 {
647   struct GNUNET_RPS_StreamRequestHandle *srh = cls;
648
649   srh->callback_task = NULL;
650   srh->ready_cb (srh->ready_cb_cls,
651                  srh_callback_num_peers,
652                  srh_callback_peers);
653 }
654
655
656 /**
657  * This function is called, when the service sends another peer from the biased
658  * stream.
659  * It calls the callback the caller provided
660  * and disconnects afterwards.
661  *
662  * @param msg the message
663  */
664 static void
665 handle_stream_input (void *cls,
666                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
667 {
668   struct GNUNET_RPS_Handle *h = cls;
669   // const struct GNUNET_PeerIdentity *peers;
670   uint64_t num_peers;
671   struct GNUNET_RPS_StreamRequestHandle *srh_iter;
672   struct GNUNET_RPS_StreamRequestHandle *srh_next;
673
674   // peers = (struct GNUNET_PeerIdentity *) &msg[1];
675   num_peers = ntohl (msg->num_peers);
676   srh_callback_num_peers = num_peers;
677   GNUNET_free_non_null (srh_callback_peers);
678   srh_callback_peers = GNUNET_new_array (num_peers,
679                                          struct GNUNET_PeerIdentity);
680   GNUNET_memcpy (srh_callback_peers,
681                  &msg[1],
682                  num_peers * sizeof(struct GNUNET_PeerIdentity));
683   LOG (GNUNET_ERROR_TYPE_DEBUG,
684        "Received %" PRIu64 " peer(s) from stream input.\n",
685        num_peers);
686   for (srh_iter = h->stream_requests_head;
687        NULL != srh_iter;
688        srh_iter = srh_next)
689   {
690     LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
691     /* Store next pointer - srh might be removed/freed in callback */
692     srh_next = srh_iter->next;
693     if (NULL != srh_iter->callback_task)
694       GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
695     srh_iter->callback_task =
696       GNUNET_SCHEDULER_add_now (&srh_callback_scheduled,
697                                 srh_iter);
698   }
699
700   if (NULL == h->stream_requests_head)
701   {
702     cancel_stream (h);
703   }
704 }
705
706
707 /**
708  * Reconnect to the service
709  */
710 static void
711 reconnect (struct GNUNET_RPS_Handle *h);
712
713
714 /**
715  * Error handler for mq.
716  *
717  * This function is called whan mq encounters an error.
718  * Until now mq doesn't provide useful error messages.
719  *
720  * @param cls the closure
721  * @param error error code without specyfied meaning
722  */
723 static void
724 mq_error_handler (void *cls,
725                   enum GNUNET_MQ_Error error)
726 {
727   struct GNUNET_RPS_Handle *h = cls;
728
729   // TODO LOG
730   LOG (GNUNET_ERROR_TYPE_WARNING,
731        "Problem with message queue. error: %i\n\
732        1: READ,\n\
733        2: WRITE,\n\
734        4: TIMEOUT\n",
735        // TODO: write GNUNET_MQ_strerror (error)
736        error);
737   reconnect (h);
738   /* Resend all pending request as the service destroyed its knowledge
739    * about them */
740 }
741
742
743 /**
744  * @brief Create the hash value from the share value that defines the sub
745  * (-group)
746  *
747  * @param share_val Share value
748  * @param hash[out] Pointer to the location in which the hash will be stored.
749  */
750 static void
751 hash_from_share_val (const char *share_val,
752                      struct GNUNET_HashCode *hash)
753 {
754   GNUNET_CRYPTO_kdf (hash,
755                      sizeof(struct GNUNET_HashCode),
756                      "rps",
757                      strlen ("rps"),
758                      share_val,
759                      strlen (share_val),
760                      NULL, 0);
761 }
762
763
764 /**
765  * @brief Callback for network size estimate - called with new estimates about
766  * the network size, updates all samplers with the new estimate
767  *
768  * Implements #GNUNET_NSE_Callback
769  *
770  * @param cls the rps handle
771  * @param timestamp unused
772  * @param logestimate the estimate
773  * @param std_dev the standard distribution
774  */
775 static void
776 nse_cb (void *cls,
777         struct GNUNET_TIME_Absolute timestamp,
778         double logestimate,
779         double std_dev)
780 {
781   struct GNUNET_RPS_Handle *h = cls;
782
783   (void) timestamp;
784   (void) std_dev;
785
786   for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
787        NULL != rh_iter && NULL != rh_iter->next;
788        rh_iter = rh_iter->next)
789   {
790     RPS_sampler_update_with_nw_size (rh_iter->sampler,
791                                      GNUNET_NSE_log_estimate_to_n (
792                                        logestimate));
793   }
794   for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
795        NULL != rhs_iter && NULL != rhs_iter->next;
796        rhs_iter = rhs_iter->next)
797   {
798     RPS_sampler_update_with_nw_size (rhs_iter->sampler,
799                                      GNUNET_NSE_log_estimate_to_n (
800                                        logestimate));
801   }
802 }
803
804
805 /**
806  * Reconnect to the service
807  */
808 static void
809 reconnect (struct GNUNET_RPS_Handle *h)
810 {
811   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
812     GNUNET_MQ_hd_var_size (view_update,
813                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
814                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
815                            h),
816     GNUNET_MQ_hd_var_size (stream_input,
817                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
818                            struct GNUNET_RPS_CS_DEBUG_StreamReply,
819                            h),
820     GNUNET_MQ_handler_end ()
821   };
822
823   if (NULL != h->mq)
824     GNUNET_MQ_destroy (h->mq);
825   h->mq = GNUNET_CLIENT_connect (h->cfg,
826                                  "rps",
827                                  mq_handlers,
828                                  &mq_error_handler,
829                                  h);
830   if (NULL != h->nse)
831     GNUNET_NSE_disconnect (h->nse);
832   h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h);
833 }
834
835
836 /**
837  * Connect to the rps service
838  *
839  * @param cfg configuration to use
840  * @return a handle to the service, NULL on error
841  */
842 struct GNUNET_RPS_Handle *
843 GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
844 {
845   struct GNUNET_RPS_Handle *h;
846
847   h = GNUNET_new (struct GNUNET_RPS_Handle);
848   h->cfg = cfg;
849   if (GNUNET_OK !=
850       GNUNET_CONFIGURATION_get_value_float (cfg,
851                                             "RPS",
852                                             "DESIRED_PROBABILITY",
853                                             &h->desired_probability))
854   {
855     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
856                                "RPS", "DESIRED_PROBABILITY");
857     GNUNET_free (h);
858     return NULL;
859   }
860   if ((0 > h->desired_probability) ||
861       (1 < h->desired_probability) )
862   {
863     LOG (GNUNET_ERROR_TYPE_ERROR,
864          "The desired probability must be in the interval [0;1]\n");
865     GNUNET_free (h);
866     return NULL;
867   }
868   if (GNUNET_OK !=
869       GNUNET_CONFIGURATION_get_value_float (cfg,
870                                             "RPS",
871                                             "DEFICIENCY_FACTOR",
872                                             &h->deficiency_factor))
873   {
874     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
875                                "RPS", "DEFICIENCY_FACTOR");
876     GNUNET_free (h);
877     return NULL;
878   }
879   if ((0 > h->desired_probability) ||
880       (1 < h->desired_probability) )
881   {
882     LOG (GNUNET_ERROR_TYPE_ERROR,
883          "The deficiency factor must be in the interval [0;1]\n");
884     GNUNET_free (h);
885     return NULL;
886   }
887   reconnect (h);
888   if (NULL == h->mq)
889   {
890     GNUNET_free (h);
891     return NULL;
892   }
893   return h;
894 }
895
896
897 /**
898  * @brief Start a sub with the given shared value
899  *
900  * @param h Handle to rps
901  * @param shared_value The shared value that defines the members of the sub (-gorup)
902  */
903 void
904 GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
905                       const char *shared_value)
906 {
907   struct GNUNET_RPS_CS_SubStartMessage *msg;
908   struct GNUNET_MQ_Envelope *ev;
909
910   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
911   hash_from_share_val (shared_value, &msg->hash);
912   msg->round_interval = GNUNET_TIME_relative_hton ( // TODO read from config!
913     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30));
914   GNUNET_assert (0 != msg->round_interval.rel_value_us__);
915
916   GNUNET_MQ_send (h->mq, ev);
917 }
918
919
920 /**
921  * @brief Stop a sub with the given shared value
922  *
923  * @param h Handle to rps
924  * @param shared_value The shared value that defines the members of the sub (-gorup)
925  */
926 void
927 GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
928                      const char *shared_value)
929 {
930   struct GNUNET_RPS_CS_SubStopMessage *msg;
931   struct GNUNET_MQ_Envelope *ev;
932
933   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
934   hash_from_share_val (shared_value, &msg->hash);
935
936   GNUNET_MQ_send (h->mq, ev);
937 }
938
939
940 /**
941  * Request n random peers.
942  *
943  * @param rps_handle handle to the rps service
944  * @param num_req_peers number of peers we want to receive
945  * @param ready_cb the callback called when the peers are available
946  * @param cls closure given to the callback
947  * @return a handle to cancel this request
948  */
949 struct GNUNET_RPS_Request_Handle *
950 GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
951                           uint32_t num_req_peers,
952                           GNUNET_RPS_NotifyReadyCB ready_cb,
953                           void *cls)
954 {
955   struct GNUNET_RPS_Request_Handle *rh;
956
957   LOG (GNUNET_ERROR_TYPE_INFO,
958        "Client requested %" PRIu32 " peers\n",
959        num_req_peers);
960   rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
961   rh->rps_handle = rps_handle;
962   rh->num_requests = num_req_peers;
963   rh->sampler = RPS_sampler_mod_init (num_req_peers,
964                                       GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
965   RPS_sampler_set_desired_probability (rh->sampler,
966                                        rps_handle->desired_probability);
967   RPS_sampler_set_deficiency_factor (rh->sampler,
968                                      rps_handle->deficiency_factor);
969   rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
970                                                  num_req_peers,
971                                                  peers_ready_cb,
972                                                  rh);
973   rh->srh = GNUNET_RPS_stream_request (rps_handle,
974                                        collect_peers_cb,
975                                        rh); /* cls */
976   rh->ready_cb = ready_cb;
977   rh->ready_cb_cls = cls;
978   GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head,
979                                rps_handle->rh_tail,
980                                rh);
981
982   return rh;
983 }
984
985
986 /**
987  * Request one random peer, getting additional information.
988  *
989  * @param rps_handle handle to the rps service
990  * @param ready_cb the callback called when the peers are available
991  * @param cls closure given to the callback
992  * @return a handle to cancel this request
993  */
994 struct GNUNET_RPS_Request_Handle_Single_Info *
995 GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle,
996                               GNUNET_RPS_NotifyReadySingleInfoCB ready_cb,
997                               void *cls)
998 {
999   struct GNUNET_RPS_Request_Handle_Single_Info *rhs;
1000   uint32_t num_req_peers = 1;
1001
1002   LOG (GNUNET_ERROR_TYPE_INFO,
1003        "Client requested peer with additional info\n");
1004   rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info);
1005   rhs->rps_handle = rps_handle;
1006   rhs->sampler = RPS_sampler_mod_init (num_req_peers,
1007                                        GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
1008   RPS_sampler_set_desired_probability (rhs->sampler,
1009                                        rps_handle->desired_probability);
1010   RPS_sampler_set_deficiency_factor (rhs->sampler,
1011                                      rps_handle->deficiency_factor);
1012   rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler,
1013                                                     peer_info_ready_cb,
1014                                                     rhs);
1015   rhs->srh = GNUNET_RPS_stream_request (rps_handle,
1016                                         collect_peers_info_cb,
1017                                         rhs); /* cls */
1018   rhs->ready_cb = ready_cb;
1019   rhs->ready_cb_cls = cls;
1020   GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head,
1021                                rps_handle->rhs_tail,
1022                                rhs);
1023
1024   return rhs;
1025 }
1026
1027
1028 /**
1029  * Seed rps service with peerIDs.
1030  *
1031  * @param h handle to the rps service
1032  * @param n number of peers to seed
1033  * @param ids the ids of the peers seeded
1034  */
1035 void
1036 GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
1037                      uint32_t n,
1038                      const struct GNUNET_PeerIdentity *ids)
1039 {
1040   size_t size_needed;
1041   uint32_t num_peers_max;
1042   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
1043   struct GNUNET_MQ_Envelope *ev;
1044   struct GNUNET_RPS_CS_SeedMessage *msg;
1045
1046   LOG (GNUNET_ERROR_TYPE_DEBUG,
1047        "Client wants to seed %" PRIu32 " peers:\n",
1048        n);
1049   for (unsigned int i = 0; i < n; i++)
1050     LOG (GNUNET_ERROR_TYPE_DEBUG,
1051          "%u. peer: %s\n",
1052          i,
1053          GNUNET_i2s (&ids[i]));
1054
1055   /* The actual size the message occupies */
1056   size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1057                 + n * sizeof(struct GNUNET_PeerIdentity);
1058   /* The number of peers that fits in one message together with
1059    * the respective header */
1060   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE
1061                    - sizeof(struct GNUNET_RPS_CS_SeedMessage))
1062                   / sizeof(struct GNUNET_PeerIdentity);
1063   tmp_peer_pointer = ids;
1064
1065   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1066   {
1067     ev = GNUNET_MQ_msg_extra (msg,
1068                               num_peers_max * sizeof(struct
1069                                                      GNUNET_PeerIdentity),
1070                               GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1071     msg->num_peers = htonl (num_peers_max);
1072     GNUNET_memcpy (&msg[1],
1073                    tmp_peer_pointer,
1074                    num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1075     GNUNET_MQ_send (h->mq,
1076                     ev);
1077     n -= num_peers_max;
1078     size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1079                   + n * sizeof(struct GNUNET_PeerIdentity);
1080     /* Set pointer to beginning of next block of num_peers_max peers */
1081     tmp_peer_pointer = &ids[num_peers_max];
1082   }
1083
1084   ev = GNUNET_MQ_msg_extra (msg,
1085                             n * sizeof(struct GNUNET_PeerIdentity),
1086                             GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1087   msg->num_peers = htonl (n);
1088   GNUNET_memcpy (&msg[1],
1089                  tmp_peer_pointer,
1090                  n * sizeof(struct GNUNET_PeerIdentity));
1091   GNUNET_MQ_send (h->mq,
1092                   ev);
1093 }
1094
1095
1096 #if ENABLE_MALICIOUS
1097 /**
1098  * Turn RPS service to act malicious.
1099  *
1100  * @param h handle to the rps service
1101  * @param type which type of malicious peer to turn to.
1102  *             0 Don't act malicious at all
1103  *             1 Try to maximise representation
1104  *             2 Try to partition the network
1105  *               (isolate one peer from the rest)
1106  * @param n number of @a ids
1107  * @param ids the ids of the malicious peers
1108  *            if @type is 2 the last id is the id of the
1109  *            peer to be isolated from the rest
1110  */
1111 void
1112 GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
1113                           uint32_t type,
1114                           uint32_t num_peers,
1115                           const struct GNUNET_PeerIdentity *peer_ids,
1116                           const struct GNUNET_PeerIdentity *target_peer)
1117 {
1118   size_t size_needed;
1119   uint32_t num_peers_max;
1120   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
1121   struct GNUNET_MQ_Envelope *ev;
1122   struct GNUNET_RPS_CS_ActMaliciousMessage *msg;
1123
1124   unsigned int i;
1125
1126   LOG (GNUNET_ERROR_TYPE_DEBUG,
1127        "Client turns malicious (type %" PRIu32 ") with %" PRIu32
1128        " other peers:\n",
1129        type,
1130        num_peers);
1131   for (i = 0; i < num_peers; i++)
1132     LOG (GNUNET_ERROR_TYPE_DEBUG,
1133          "%u. peer: %s\n",
1134          i,
1135          GNUNET_i2s (&peer_ids[i]));
1136
1137   /* The actual size the message would occupy */
1138   size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1139                 + num_peers * sizeof(struct GNUNET_PeerIdentity);
1140   /* The number of peers that fit in one message together with
1141    * the respective header */
1142   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE
1143                    - sizeof(struct GNUNET_RPS_CS_SeedMessage))
1144                   / sizeof(struct GNUNET_PeerIdentity);
1145   tmp_peer_pointer = peer_ids;
1146
1147   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1148   {
1149     LOG (GNUNET_ERROR_TYPE_DEBUG,
1150          "Too many peers to send at once, sending %" PRIu32
1151          " (all we can so far)\n",
1152          num_peers_max);
1153     ev = GNUNET_MQ_msg_extra (msg,
1154                               num_peers_max * sizeof(struct
1155                                                      GNUNET_PeerIdentity),
1156                               GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1157     msg->type = htonl (type);
1158     msg->num_peers = htonl (num_peers_max);
1159     if ((2 == type) ||
1160         (3 == type))
1161       msg->attacked_peer = peer_ids[num_peers];
1162     GNUNET_memcpy (&msg[1],
1163                    tmp_peer_pointer,
1164                    num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1165
1166     GNUNET_MQ_send (h->mq, ev);
1167
1168     num_peers -= num_peers_max;
1169     size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1170                   + num_peers * sizeof(struct GNUNET_PeerIdentity);
1171     /* Set pointer to beginning of next block of num_peers_max peers */
1172     tmp_peer_pointer = &peer_ids[num_peers_max];
1173   }
1174
1175   ev = GNUNET_MQ_msg_extra (msg,
1176                             num_peers * sizeof(struct GNUNET_PeerIdentity),
1177                             GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1178   msg->type = htonl (type);
1179   msg->num_peers = htonl (num_peers);
1180   if ((2 == type) ||
1181       (3 == type))
1182     msg->attacked_peer = *target_peer;
1183   GNUNET_memcpy (&msg[1],
1184                  tmp_peer_pointer,
1185                  num_peers * sizeof(struct GNUNET_PeerIdentity));
1186
1187   GNUNET_MQ_send (h->mq, ev);
1188 }
1189
1190
1191 #endif /* ENABLE_MALICIOUS */
1192
1193
1194 /**
1195  * Cancle an issued request.
1196  *
1197  * @param rh request handle of request to cancle
1198  */
1199 void
1200 GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1201 {
1202   struct GNUNET_RPS_Handle *h;
1203
1204   h = rh->rps_handle;
1205   GNUNET_assert (NULL != rh);
1206   GNUNET_assert (NULL != rh->srh);
1207   GNUNET_assert (h == rh->srh->rps_handle);
1208   GNUNET_RPS_stream_cancel (rh->srh);
1209   rh->srh = NULL;
1210   if (NULL == h->stream_requests_head)
1211     cancel_stream (h);
1212   if (NULL != rh->sampler_rh)
1213   {
1214     RPS_sampler_request_cancel (rh->sampler_rh);
1215   }
1216   RPS_sampler_destroy (rh->sampler);
1217   rh->sampler = NULL;
1218   GNUNET_CONTAINER_DLL_remove (h->rh_head,
1219                                h->rh_tail,
1220                                rh);
1221   GNUNET_free (rh);
1222 }
1223
1224
1225 /**
1226  * Cancle an issued single info request.
1227  *
1228  * @param rhs request handle of request to cancle
1229  */
1230 void
1231 GNUNET_RPS_request_single_info_cancel (
1232   struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
1233 {
1234   struct GNUNET_RPS_Handle *h;
1235
1236   h = rhs->rps_handle;
1237   GNUNET_assert (NULL != rhs);
1238   GNUNET_assert (NULL != rhs->srh);
1239   GNUNET_assert (h == rhs->srh->rps_handle);
1240   GNUNET_RPS_stream_cancel (rhs->srh);
1241   rhs->srh = NULL;
1242   if (NULL == h->stream_requests_head)
1243     cancel_stream (h);
1244   if (NULL != rhs->sampler_rh)
1245   {
1246     RPS_sampler_request_single_info_cancel (rhs->sampler_rh);
1247   }
1248   RPS_sampler_destroy (rhs->sampler);
1249   rhs->sampler = NULL;
1250   GNUNET_CONTAINER_DLL_remove (h->rhs_head,
1251                                h->rhs_tail,
1252                                rhs);
1253   GNUNET_free (rhs);
1254 }
1255
1256
1257 /**
1258  * Disconnect from the rps service
1259  *
1260  * @param h the handle to the rps service
1261  */
1262 void
1263 GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1264 {
1265   if (NULL != h->stream_requests_head)
1266   {
1267     struct GNUNET_RPS_StreamRequestHandle *srh_next;
1268
1269     LOG (GNUNET_ERROR_TYPE_WARNING,
1270          "Still waiting for replies\n");
1271     for (struct GNUNET_RPS_StreamRequestHandle *srh_iter =
1272            h->stream_requests_head;
1273          NULL != srh_iter;
1274          srh_iter = srh_next)
1275     {
1276       srh_next = srh_iter->next;
1277       GNUNET_RPS_stream_cancel (srh_iter);
1278     }
1279   }
1280   if (NULL != h->rh_head)
1281   {
1282     LOG (GNUNET_ERROR_TYPE_WARNING,
1283          "Not all requests were cancelled!\n");
1284     for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1285          h->rh_head != NULL;
1286          rh_iter = h->rh_head)
1287     {
1288       GNUNET_RPS_request_cancel (rh_iter);
1289     }
1290   }
1291   if (NULL != h->rhs_head)
1292   {
1293     LOG (GNUNET_ERROR_TYPE_WARNING,
1294          "Not all requests were cancelled!\n");
1295     for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1296          h->rhs_head != NULL;
1297          rhs_iter = h->rhs_head)
1298     {
1299       GNUNET_RPS_request_single_info_cancel (rhs_iter);
1300     }
1301   }
1302   if (NULL != srh_callback_peers)
1303   {
1304     GNUNET_free (srh_callback_peers);
1305     srh_callback_peers = NULL;
1306   }
1307   if (NULL != h->view_update_cb)
1308   {
1309     LOG (GNUNET_ERROR_TYPE_WARNING,
1310          "Still waiting for view updates\n");
1311     GNUNET_RPS_view_request_cancel (h);
1312   }
1313   if (NULL != h->nse)
1314     GNUNET_NSE_disconnect (h->nse);
1315   GNUNET_MQ_destroy (h->mq);
1316   GNUNET_free (h);
1317 }
1318
1319
1320 /* end of rps_api.c */