e4f4db506a1d02044112263bd5501ecb1a572518
[oweals/gnunet.git] / src / rps / rps_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/rps_api.c
21  * @brief API for rps
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "rps.h"
27 #include "gnunet_rps_service.h"
28 #include "gnunet-service-rps_sampler.h"
29
30 #include <inttypes.h>
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
33
34 /**
35  * Handle for a request to get peers from biased stream of ids
36  */
37 struct GNUNET_RPS_StreamRequestHandle
38 {
39   /**
40    * The client issuing the request.
41    */
42   struct GNUNET_RPS_Handle *rps_handle;
43
44   /**
45    * The number of requested peers.
46    */
47   uint32_t num_peers_left;
48
49   /**
50    * The callback to be called when we receive an answer.
51    */
52   GNUNET_RPS_NotifyReadyCB ready_cb;
53
54   /**
55    * The closure for the callback.
56    */
57   void *ready_cb_cls;
58
59   /**
60    * @brief Next element of the DLL
61    */
62   struct GNUNET_RPS_StreamRequestHandle *next;
63
64   /**
65    * @brief Previous element of the DLL
66    */
67   struct GNUNET_RPS_StreamRequestHandle *prev;
68 };
69
70
71 /**
72  * Handler to handle requests from a client.
73  */
74 struct GNUNET_RPS_Handle
75 {
76   /**
77    * The handle to the client configuration.
78    */
79   const struct GNUNET_CONFIGURATION_Handle *cfg;
80
81   /**
82    * The message queue to the client.
83    */
84   struct GNUNET_MQ_Handle *mq;
85
86   /**
87    * Array of Request_Handles.
88    */
89   struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers;
90
91   /**
92    * The id of the last request.
93    */
94   uint32_t current_request_id;
95
96   /**
97    * @brief Callback called on each update of the view
98    */
99   GNUNET_RPS_NotifyReadyCB view_update_cb;
100
101   /**
102    * @brief Closure to each requested update of the view
103    */
104   void *view_update_cls;
105
106   /**
107    * @brief Closure to each requested peer from the biased stream
108    */
109   void *stream_input_cls;
110
111   /**
112    * @brief Head of the DLL of stream requests
113    */
114   struct GNUNET_RPS_StreamRequestHandle *stream_requests_head;
115
116   /**
117    * @brief Tail of the DLL of stream requests
118    */
119   struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
120 };
121
122
123 /**
124  * Handler for a single request from a client.
125  */
126 struct GNUNET_RPS_Request_Handle
127 {
128   /**
129    * The client issuing the request.
130    */
131   struct GNUNET_RPS_Handle *rps_handle;
132
133   /**
134    * The id of the request.
135    */
136   uint32_t id;
137
138   /**
139    * The number of requested peers.
140    */
141   uint32_t num_requests;
142
143   /**
144    * @brief The Sampler for the client request
145    */
146   struct RPS_Sampler *sampler;
147
148   /**
149    * The callback to be called when we receive an answer.
150    */
151   GNUNET_RPS_NotifyReadyCB ready_cb;
152
153   /**
154    * The closure for the callback.
155    */
156   void *ready_cb_cls;
157 };
158
159
160 /**
161  * Struct used to pack the callback, its closure (provided by the caller)
162  * and the connection handler to the service to pass it to a callback function.
163  */
164 struct cb_cls_pack
165 {
166   /**
167    * Callback provided by the client
168    */
169   GNUNET_RPS_NotifyReadyCB cb;
170
171   /**
172    * Closure provided by the client
173    */
174   void *cls;
175
176   /**
177    * Handle to the service connection
178    */
179  struct GNUNET_CLIENT_Connection *service_conn;
180 };
181
182
183 /**
184  * @brief Create a new handle for a stream request
185  *
186  * @param rps_handle The rps handle
187  * @param num_peers The number of desired peers
188  * @param ready_cb The callback to be called, once all peers are ready
189  * @param cls The colsure to provide to the callback
190  *
191  * @return The handle to the stream request
192  */
193 static struct GNUNET_RPS_StreamRequestHandle *
194 new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
195                     uint64_t num_peers,
196                     GNUNET_RPS_NotifyReadyCB ready_cb,
197                     void *cls)
198 {
199   struct GNUNET_RPS_StreamRequestHandle *srh;
200
201   srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
202
203   srh->rps_handle = rps_handle;
204   srh->num_peers_left = num_peers;
205   srh->ready_cb = ready_cb;
206   srh->ready_cb_cls = cls;
207   GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
208                                rps_handle->stream_requests_tail,
209                                srh);
210
211   return srh;
212 }
213
214
215 /**
216  * @brief Remove the given stream request from the list of requests and memory
217  *
218  * @param srh The request to be removed
219  * @param srh_head Head of the DLL to remove request from
220  * @param srh_tail Tail of the DLL to remove request from
221  */
222 static void
223 remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
224                        struct GNUNET_RPS_StreamRequestHandle *srh_head,
225                        struct GNUNET_RPS_StreamRequestHandle *srh_tail)
226 {
227   GNUNET_CONTAINER_DLL_remove (srh_head,
228                                srh_tail,
229                                srh);
230
231   GNUNET_free (srh);
232 }
233
234
235 /**
236  * @brief Create new request handle
237  *
238  * @param rps_handle Handle to the service
239  * @param num_requests Number of requests
240  * @param ready_cb Callback
241  * @param cls Closure
242  *
243  * @return The newly created request handle
244  */
245 static struct GNUNET_RPS_Request_Handle *
246 new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
247                     uint64_t num_requests,
248                     struct RPS_Sampler *sampler,
249                     GNUNET_RPS_NotifyReadyCB ready_cb,
250                     void *cls)
251 {
252   struct GNUNET_RPS_Request_Handle *rh;
253
254   rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
255   rh->rps_handle = rps_handle;
256   rh->id = rps_handle->current_request_id++;
257   rh->num_requests = num_requests;
258   rh->sampler = sampler;
259   rh->ready_cb = ready_cb;
260   rh->ready_cb_cls = cls;
261   GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
262       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
263
264   return rh;
265 }
266
267
268 /**
269  * @brief Send a request to the service.
270  *
271  * @param h rps handle
272  * @param id id of the request
273  * @param num_req_peers number of peers
274  */
275 void
276 send_request (const struct GNUNET_RPS_Handle *h,
277               uint32_t id,
278               uint32_t num_req_peers)
279 {
280   struct GNUNET_MQ_Envelope *ev;
281   struct GNUNET_RPS_CS_RequestMessage *msg;
282
283   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST);
284   msg->num_peers = htonl (num_req_peers);
285   msg->id = htonl (id);
286   GNUNET_MQ_send (h->mq, ev);
287 }
288
289 /**
290  * @brief Iterator function over pending requests
291  *
292  * Implements #GNUNET_CONTAINER_HashMapIterator32
293  *
294  * @param cls rps handle
295  * @param key id of the request
296  * @param value request handle
297  *
298  * @return GNUNET_YES to continue iteration
299  */
300 int
301 resend_requests_iterator (void *cls, uint32_t key, void *value)
302 {
303   const struct GNUNET_RPS_Handle *h = cls;
304   const struct GNUNET_RPS_Request_Handle *req_handle = value;
305   (void) key;
306
307   send_request (h, req_handle->id, req_handle->num_requests);
308   return GNUNET_YES; /* continue iterating */
309 }
310
311 /**
312  * @brief Resend all pending requests
313  *
314  * This is used to resend all pending requests after the client
315  * reconnected to the service, because the service cancels all
316  * pending requests after reconnection.
317  *
318  * @param h rps handle
319  */
320 void
321 resend_requests (struct GNUNET_RPS_Handle *h)
322 {
323   GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers,
324                                            resend_requests_iterator,
325                                            h);
326 }
327
328
329 /**
330  * This function is called, when the service replies to our request.
331  * It verifies that @a msg is well-formed.
332  *
333  * @param cls the closure
334  * @param msg the message
335  * @return #GNUNET_OK if @a msg is well-formed
336  */
337 static int
338 check_reply (void *cls,
339              const struct GNUNET_RPS_CS_ReplyMessage *msg)
340 {
341   uint16_t msize = ntohs (msg->header.size);
342   uint32_t num_peers = ntohl (msg->num_peers);
343   (void) cls;
344
345   msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage);
346   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
347        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
348   {
349     GNUNET_break (0);
350     return GNUNET_SYSERR;
351   }
352   return GNUNET_OK;
353 }
354
355
356 /**
357  * This function is called, when the service replies to our request.
358  * It calls the callback the caller gave us with the provided closure
359  * and disconnects afterwards.
360  *
361  * @param cls the closure
362  * @param msg the message
363  */
364 static void
365 handle_reply (void *cls,
366               const struct GNUNET_RPS_CS_ReplyMessage *msg)
367 {
368   struct GNUNET_RPS_Handle *h = cls;
369   struct GNUNET_PeerIdentity *peers;
370   struct GNUNET_RPS_Request_Handle *rh;
371   uint32_t id;
372
373   /* Give the peers back */
374   id = ntohl (msg->id);
375   LOG (GNUNET_ERROR_TYPE_DEBUG,
376        "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n",
377        ntohl (msg->num_peers),
378        id);
379
380   peers = (struct GNUNET_PeerIdentity *) &msg[1];
381   GNUNET_assert (GNUNET_YES ==
382       GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id));
383   rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id);
384   GNUNET_assert (NULL != rh);
385   GNUNET_assert (rh->num_requests == ntohl (msg->num_peers));
386   GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id);
387   rh->ready_cb (rh->ready_cb_cls,
388                 ntohl (msg->num_peers),
389                 peers);
390 }
391
392
393 /* Get internals for debugging/profiling purposes */
394
395 /**
396  * Request updates of view
397  *
398  * @param rps_handle handle to the rps service
399  * @param num_req_peers number of peers we want to receive
400  *        (0 for infinite updates)
401  * @param cls a closure that will be given to the callback
402  * @param ready_cb the callback called when the peers are available
403  */
404 void
405 GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
406                          uint32_t num_updates,
407                          GNUNET_RPS_NotifyReadyCB view_update_cb,
408                          void *cls)
409 {
410   struct GNUNET_MQ_Envelope *ev;
411   struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg;
412
413   LOG (GNUNET_ERROR_TYPE_DEBUG,
414        "Client requests %" PRIu32 " view updates\n",
415        num_updates);
416   rps_handle->view_update_cb = view_update_cb;
417   rps_handle->view_update_cls = cls;
418
419   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST);
420   msg->num_updates = htonl (num_updates);
421   GNUNET_MQ_send (rps_handle->mq, ev);
422 }
423
424
425 /**
426  * Request biased stream of peers that are being put into the sampler
427  *
428  * @param rps_handle handle to the rps service
429  * @param num_req_peers number of peers we want to receive
430  *        (0 for infinite updates)
431  * @param cls a closure that will be given to the callback
432  * @param ready_cb the callback called when the peers are available
433  */
434 struct GNUNET_RPS_StreamRequestHandle *
435 GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
436                            uint32_t num_peers,
437                            GNUNET_RPS_NotifyReadyCB stream_input_cb,
438                            void *cls)
439 {
440   struct GNUNET_RPS_StreamRequestHandle *srh;
441   struct GNUNET_MQ_Envelope *ev;
442   struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
443
444   srh = new_stream_request (rps_handle,
445                             num_peers, /* num requests */
446                             stream_input_cb,
447                             cls);
448   LOG (GNUNET_ERROR_TYPE_DEBUG,
449        "Client requests %" PRIu32 " biased stream updates\n",
450        num_peers);
451
452   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
453   GNUNET_MQ_send (rps_handle->mq, ev);
454   return srh;
455 }
456
457
458 /**
459  * This function is called, when the service updates the view.
460  * It verifies that @a msg is well-formed.
461  *
462  * @param cls the closure
463  * @param msg the message
464  * @return #GNUNET_OK if @a msg is well-formed
465  */
466 static int
467 check_view_update (void *cls,
468                    const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
469 {
470   uint16_t msize = ntohs (msg->header.size);
471   uint32_t num_peers = ntohl (msg->num_peers);
472   (void) cls;
473
474   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_ViewReply);
475   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
476        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
477   {
478     GNUNET_break (0);
479     return GNUNET_SYSERR;
480   }
481   return GNUNET_OK;
482 }
483
484
485 /**
486  * This function is called, when the service updated its view.
487  * It calls the callback the caller provided
488  * and disconnects afterwards.
489  *
490  * @param msg the message
491  */
492 static void
493 handle_view_update (void *cls,
494                     const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
495 {
496   struct GNUNET_RPS_Handle *h = cls;
497   struct GNUNET_PeerIdentity *peers;
498
499   /* Give the peers back */
500   LOG (GNUNET_ERROR_TYPE_DEBUG,
501        "New view of %" PRIu32 " peers:\n",
502        ntohl (msg->num_peers));
503
504   peers = (struct GNUNET_PeerIdentity *) &msg[1];
505   GNUNET_assert (NULL != h);
506   GNUNET_assert (NULL != h->view_update_cb);
507   h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers);
508 }
509
510
511 /**
512  * @brief Send message to service that this client does not want to receive
513  * further updates from the biased peer stream
514  *
515  * @param rps_handle The handle representing the service to the client
516  */
517 static void
518 cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
519 {
520   struct GNUNET_MQ_Envelope *ev;
521
522   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
523   GNUNET_MQ_send (rps_handle->mq, ev);
524 }
525
526
527 /**
528  * @brief Cancel a specific request for updates from the biased peer stream
529  *
530  * @param srh The request handle to cancel
531  */
532 void
533 GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
534 {
535   struct GNUNET_RPS_Handle *rps_handle;
536
537   rps_handle = srh->rps_handle;
538   GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
539                                rps_handle->stream_requests_tail,
540                                srh);
541   GNUNET_free (srh);
542   if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
543 }
544
545
546 /**
547  * This function is called, when the service sends another peer from the biased
548  * stream.
549  * It calls the callback the caller provided
550  * and disconnects afterwards.
551  *
552  * TODO merge with check_view_update
553  *
554  * @param msg the message
555  */
556 static int
557 check_stream_input (void *cls,
558                     const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
559 {
560   uint16_t msize = ntohs (msg->header.size);
561   uint32_t num_peers = ntohl (msg->num_peers);
562   (void) cls;
563
564   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
565   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
566        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
567   {
568     GNUNET_break (0);
569     return GNUNET_SYSERR;
570   }
571   return GNUNET_OK;
572 }
573
574 /**
575  * This function is called, when the service sends another peer from the biased
576  * stream.
577  * It calls the callback the caller provided
578  * and disconnects afterwards.
579  *
580  * @param msg the message
581  */
582 static void
583 handle_stream_input (void *cls,
584                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
585 {
586   struct GNUNET_RPS_Handle *h = cls;
587   const struct GNUNET_PeerIdentity *peers;
588   /* The following two pointers are used to prevent that new handles are
589    * inserted into the DLL, that is currently iterated over, from within a call
590    * to that handler_cb, are executed and in turn again add themselves to the
591    * iterated DLL infinitely */
592   struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp;
593   struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp;
594   uint64_t num_peers;
595   uint64_t num_peers_return;
596
597   peers = (struct GNUNET_PeerIdentity *) &msg[1];
598   num_peers = ntohl (msg->num_peers);
599   LOG (GNUNET_ERROR_TYPE_DEBUG,
600        "Received %" PRIu64 " peer(s) from stream input.\n",
601        num_peers);
602   srh_head_tmp = h->stream_requests_head;
603   srh_tail_tmp = h->stream_requests_tail;
604   h->stream_requests_head = NULL;
605   h->stream_requests_tail = NULL;
606   for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
607        NULL != srh_iter;
608        srh_iter = srh_iter->next)
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG,
611         "Calling srh - left: %" PRIu64 "\n",
612         srh_iter->num_peers_left);
613     if (0 == srh_iter->num_peers_left) /* infinite updates */
614     {
615       num_peers_return = num_peers;
616     }
617     else if (num_peers > srh_iter->num_peers_left)
618     {
619       num_peers_return = num_peers - srh_iter->num_peers_left;
620     }
621     else /* num_peers <= srh_iter->num_peers_left */
622     {
623       num_peers_return = srh_iter->num_peers_left - num_peers;
624     }
625     srh_iter->ready_cb (srh_iter->ready_cb_cls,
626                         num_peers_return,
627                         peers);
628     if (0 == srh_iter->num_peers_left) ;
629     else if (num_peers_return >= srh_iter->num_peers_left)
630     {
631       remove_stream_request (srh_iter,
632                              srh_head_tmp,
633                              srh_tail_tmp);
634     }
635     else
636     {
637       srh_iter->num_peers_left -= num_peers_return;
638     }
639   }
640   for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
641        NULL != srh_iter;
642        srh_iter = srh_iter->next)
643   {
644       GNUNET_CONTAINER_DLL_insert (h->stream_requests_head,
645                                    h->stream_requests_tail,
646                                    srh_iter);
647   }
648
649   if (NULL == h->stream_requests_head)
650   {
651     cancel_stream (h);
652   }
653 }
654
655
656 /**
657  * Reconnect to the service
658  */
659 static void
660 reconnect (struct GNUNET_RPS_Handle *h);
661
662
663 /**
664  * Error handler for mq.
665  *
666  * This function is called whan mq encounters an error.
667  * Until now mq doesn't provide useful error messages.
668  *
669  * @param cls the closure
670  * @param error error code without specyfied meaning
671  */
672 static void
673 mq_error_handler (void *cls,
674                   enum GNUNET_MQ_Error error)
675 {
676   struct GNUNET_RPS_Handle *h = cls;
677   //TODO LOG
678   LOG (GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\
679        1: READ,\n\
680        2: WRITE,\n\
681        4: TIMEOUT\n",
682        error);
683   reconnect (h);
684   /* Resend all pending request as the service destroyed its knowledge
685    * about them */
686   resend_requests (h);
687 }
688
689
690 /**
691  * Reconnect to the service
692  */
693 static void
694 reconnect (struct GNUNET_RPS_Handle *h)
695 {
696   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
697     GNUNET_MQ_hd_var_size (reply,
698                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY,
699                            struct GNUNET_RPS_CS_ReplyMessage,
700                            h),
701     GNUNET_MQ_hd_var_size (view_update,
702                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
703                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
704                            h),
705     GNUNET_MQ_hd_var_size (stream_input,
706                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
707                            struct GNUNET_RPS_CS_DEBUG_StreamReply,
708                            h),
709     GNUNET_MQ_handler_end ()
710   };
711
712   if (NULL != h->mq)
713     GNUNET_MQ_destroy (h->mq);
714   h->mq = GNUNET_CLIENT_connect (h->cfg,
715                                  "rps",
716                                  mq_handlers,
717                                  &mq_error_handler,
718                                  h);
719 }
720
721
722 /**
723  * Connect to the rps service
724  *
725  * @param cfg configuration to use
726  * @return a handle to the service
727  */
728 struct GNUNET_RPS_Handle *
729 GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
730 {
731   struct GNUNET_RPS_Handle *h;
732
733   h = GNUNET_new (struct GNUNET_RPS_Handle);
734   h->current_request_id = 0;
735   h->cfg = cfg;
736   reconnect (h);
737   if (NULL == h->mq)
738   {
739     GNUNET_free (h);
740     return NULL;
741   }
742   h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4);
743   return h;
744 }
745
746
747 /**
748  * Request n random peers.
749  *
750  * @param rps_handle handle to the rps service
751  * @param num_req_peers number of peers we want to receive
752  * @param ready_cb the callback called when the peers are available
753  * @param cls closure given to the callback
754  * @return a handle to cancel this request
755  */
756 struct GNUNET_RPS_Request_Handle *
757 GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle,
758                           uint32_t num_req_peers,
759                           GNUNET_RPS_NotifyReadyCB ready_cb,
760                           void *cls)
761 {
762   struct GNUNET_RPS_Request_Handle *rh;
763
764   rh = new_request_handle (rps_handle,
765                            num_req_peers,
766                            NULL, /* no sampler needed */
767                            ready_cb,
768                            cls);
769
770   LOG (GNUNET_ERROR_TYPE_DEBUG,
771        "Requesting %" PRIu32 " peers with id %" PRIu32 "\n",
772        num_req_peers,
773        rh->id);
774
775   send_request (rps_handle, rh->id, num_req_peers);
776   return rh;
777 }
778
779
780 /**
781  * @brief Callback to collect the peers from the biased stream and put those
782  * into the sampler.
783  *
784  * @param cls The #GNUNET_RPS_Request_Handle
785  * @param num_peers The number of peer that have been returned
786  * @param peers The array of @a num_peers that have been returned
787  */
788 void
789 collect_peers_cb (void *cls,
790                   uint64_t num_peers,
791                   const struct GNUNET_PeerIdentity *peers)
792 {
793   struct GNUNET_RPS_Request_Handle *rh = cls;
794
795   for (uint64_t i = 0; i < num_peers; i++)
796   {
797     RPS_sampler_update (rh->sampler, &peers[i]);
798   }
799 }
800
801
802 /**
803  * @brief Called once the sampler has collected all requested peers.
804  *
805  * Calls the callback provided by the client with the corresponding cls.
806  *
807  * @param peers The array of @a num_peers that has been returned.
808  * @param num_peers The number of peers that have been returned
809  * @param cls The #GNUNET_RPS_Request_Handle
810  */
811 void
812 peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
813                 uint32_t num_peers,
814                 void *cls)
815 {
816   struct GNUNET_RPS_Request_Handle *rh = cls;
817
818   rh->ready_cb (rh->ready_cb_cls,
819                 num_peers,
820                 peers);
821   // TODO cleanup, sampler, rh, cancel stuff
822   // TODO screw this function. We can give the cb,cls directly to the sampler.
823 }
824
825 /**
826  * Request n random peers.
827  *
828  * @param rps_handle handle to the rps service
829  * @param num_req_peers number of peers we want to receive
830  * @param ready_cb the callback called when the peers are available
831  * @param cls closure given to the callback
832  * @return a handle to cancel this request
833  */
834 struct GNUNET_RPS_Request_Handle *
835 GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
836                           uint32_t num_req_peers,
837                           GNUNET_RPS_NotifyReadyCB ready_cb,
838                           void *cls)
839 {
840   struct GNUNET_RPS_Request_Handle *rh;
841
842   rh = new_request_handle (rps_handle,
843                            num_req_peers,
844                            RPS_sampler_mod_init (num_req_peers,
845                                                  GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff
846                            ready_cb,
847                            cls);
848   RPS_sampler_get_n_rand_peers (rh->sampler,
849                                 num_req_peers,
850                                 peers_ready_cb,
851                                 rh);
852
853   GNUNET_RPS_stream_request (rps_handle,
854                              0, /* infinite updates */
855                              collect_peers_cb,
856                              rh); /* cls */
857
858   return rh;
859 }
860
861
862 /**
863  * Seed rps service with peerIDs.
864  *
865  * @param h handle to the rps service
866  * @param n number of peers to seed
867  * @param ids the ids of the peers seeded
868  */
869 void
870 GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
871                      uint32_t n,
872                      const struct GNUNET_PeerIdentity *ids)
873 {
874   size_t size_needed;
875   uint32_t num_peers_max;
876   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
877   struct GNUNET_MQ_Envelope *ev;
878   struct GNUNET_RPS_CS_SeedMessage *msg;
879
880   unsigned int i;
881
882   LOG (GNUNET_ERROR_TYPE_DEBUG,
883        "Client wants to seed %" PRIu32 " peers:\n",
884        n);
885   for (i = 0 ; i < n ; i++)
886     LOG (GNUNET_ERROR_TYPE_DEBUG,
887          "%u. peer: %s\n",
888          i,
889          GNUNET_i2s (&ids[i]));
890
891   /* The actual size the message occupies */
892   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
893     n * sizeof (struct GNUNET_PeerIdentity);
894   /* The number of peers that fits in one message together with
895    * the respective header */
896   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
897       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
898     sizeof (struct GNUNET_PeerIdentity);
899   tmp_peer_pointer = ids;
900
901   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
902   {
903     ev = GNUNET_MQ_msg_extra (msg, num_peers_max * sizeof (struct GNUNET_PeerIdentity),
904         GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
905     msg->num_peers = htonl (num_peers_max);
906     GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers_max * sizeof (struct GNUNET_PeerIdentity));
907     GNUNET_MQ_send (h->mq, ev);
908
909     n -= num_peers_max;
910     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
911                   n * sizeof (struct GNUNET_PeerIdentity);
912     /* Set pointer to beginning of next block of num_peers_max peers */
913     tmp_peer_pointer = &ids[num_peers_max];
914   }
915
916   ev = GNUNET_MQ_msg_extra (msg, n * sizeof (struct GNUNET_PeerIdentity),
917                             GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
918   msg->num_peers = htonl (n);
919   GNUNET_memcpy (&msg[1], tmp_peer_pointer, n * sizeof (struct GNUNET_PeerIdentity));
920
921   GNUNET_MQ_send (h->mq, ev);
922 }
923
924
925 #ifdef ENABLE_MALICIOUS
926 /**
927  * Turn RPS service to act malicious.
928  *
929  * @param h handle to the rps service
930  * @param type which type of malicious peer to turn to.
931  *             0 Don't act malicious at all
932  *             1 Try to maximise representation
933  *             2 Try to partition the network
934  *               (isolate one peer from the rest)
935  * @param n number of @a ids
936  * @param ids the ids of the malicious peers
937  *            if @type is 2 the last id is the id of the
938  *            peer to be isolated from the rest
939  */
940 void
941 GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
942                           uint32_t type,
943                           uint32_t num_peers,
944                           const struct GNUNET_PeerIdentity *peer_ids,
945                           const struct GNUNET_PeerIdentity *target_peer)
946 {
947   size_t size_needed;
948   uint32_t num_peers_max;
949   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
950   struct GNUNET_MQ_Envelope *ev;
951   struct GNUNET_RPS_CS_ActMaliciousMessage *msg;
952
953   unsigned int i;
954
955   LOG (GNUNET_ERROR_TYPE_DEBUG,
956        "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n",
957        type,
958        num_peers);
959   for (i = 0 ; i < num_peers ; i++)
960     LOG (GNUNET_ERROR_TYPE_DEBUG,
961          "%u. peer: %s\n",
962          i,
963          GNUNET_i2s (&peer_ids[i]));
964
965   /* The actual size the message would occupy */
966   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
967     num_peers * sizeof (struct GNUNET_PeerIdentity);
968   /* The number of peers that fit in one message together with
969    * the respective header */
970   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
971       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
972     sizeof (struct GNUNET_PeerIdentity);
973   tmp_peer_pointer = peer_ids;
974
975   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
976   {
977     LOG (GNUNET_ERROR_TYPE_DEBUG,
978          "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n",
979          num_peers_max);
980     ev = GNUNET_MQ_msg_extra (msg,
981                               num_peers_max * sizeof (struct GNUNET_PeerIdentity),
982                               GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
983     msg->type = htonl (type);
984     msg->num_peers = htonl (num_peers_max);
985     if ( (2 == type) ||
986          (3 == type) )
987       msg->attacked_peer = peer_ids[num_peers];
988     GNUNET_memcpy (&msg[1],
989             tmp_peer_pointer,
990             num_peers_max * sizeof (struct GNUNET_PeerIdentity));
991
992     GNUNET_MQ_send (h->mq, ev);
993
994     num_peers -= num_peers_max;
995     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
996                   num_peers * sizeof (struct GNUNET_PeerIdentity);
997     /* Set pointer to beginning of next block of num_peers_max peers */
998     tmp_peer_pointer = &peer_ids[num_peers_max];
999   }
1000
1001   ev = GNUNET_MQ_msg_extra (msg,
1002                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1003                             GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1004   msg->type = htonl (type);
1005   msg->num_peers = htonl (num_peers);
1006   if ( (2 == type) ||
1007        (3 == type) )
1008     msg->attacked_peer = *target_peer;
1009   GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct GNUNET_PeerIdentity));
1010
1011   GNUNET_MQ_send (h->mq, ev);
1012 }
1013 #endif /* ENABLE_MALICIOUS */
1014
1015
1016 /**
1017  * Cancle an issued request.
1018  *
1019  * @param rh request handle of request to cancle
1020  */
1021 void
1022 GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1023 {
1024   struct GNUNET_RPS_Handle *h;
1025   struct GNUNET_MQ_Envelope *ev;
1026   struct GNUNET_RPS_CS_RequestCancelMessage*msg;
1027
1028   LOG (GNUNET_ERROR_TYPE_DEBUG,
1029        "Cancelling request with id %" PRIu32 "\n",
1030        rh->id);
1031
1032   h = rh->rps_handle;
1033   GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers,
1034         rh->id));
1035   GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id);
1036   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL);
1037   msg->id = htonl (rh->id);
1038   GNUNET_MQ_send (rh->rps_handle->mq, ev);
1039 }
1040
1041
1042 /**
1043  * Disconnect from the rps service
1044  *
1045  * @param h the handle to the rps service
1046  */
1047 void
1048 GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1049 {
1050   GNUNET_MQ_destroy (h->mq);
1051   if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers))
1052     LOG (GNUNET_ERROR_TYPE_WARNING,
1053         "Still waiting for requests\n");
1054   GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers);
1055   GNUNET_free (h);
1056 }
1057
1058
1059 /* end of rps_api.c */