fixes for DLL management and indentation
[oweals/gnunet.git] / src / rps / rps_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/rps_api.c
21  * @brief API for rps
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "rps.h"
27 #include "gnunet_rps_service.h"
28 #include "rps-sampler_client.h"
29
30 #include <inttypes.h>
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
33
34 /**
35  * Handle for a request to get peers from biased stream of ids
36  */
37 struct GNUNET_RPS_StreamRequestHandle
38 {
39   /**
40    * The client issuing the request.
41    */
42   struct GNUNET_RPS_Handle *rps_handle;
43
44   /**
45    * The callback to be called when we receive an answer.
46    */
47   GNUNET_RPS_NotifyReadyCB ready_cb;
48
49   /**
50    * The closure for the callback.
51    */
52   void *ready_cb_cls;
53
54   /**
55    * @brief Scheduler task for scheduled callback
56    */
57   struct GNUNET_SCHEDULER_Task *callback_task;
58
59   /**
60    * @brief Next element of the DLL
61    */
62   struct GNUNET_RPS_StreamRequestHandle *next;
63
64   /**
65    * @brief Previous element of the DLL
66    */
67   struct GNUNET_RPS_StreamRequestHandle *prev;
68 };
69
70
71 /**
72  * Handler to handle requests from a client.
73  */
74 struct GNUNET_RPS_Handle
75 {
76   /**
77    * The handle to the client configuration.
78    */
79   const struct GNUNET_CONFIGURATION_Handle *cfg;
80
81   /**
82    * The message queue to the client.
83    */
84   struct GNUNET_MQ_Handle *mq;
85
86   /**
87    * @brief Callback called on each update of the view
88    */
89   GNUNET_RPS_NotifyReadyCB view_update_cb;
90
91   /**
92    * @brief Closure to each requested update of the view
93    */
94   void *view_update_cls;
95
96   /**
97    * @brief Closure to each requested peer from the biased stream
98    */
99   void *stream_input_cls;
100
101   /**
102    * @brief Head of the DLL of stream requests
103    */
104   struct GNUNET_RPS_StreamRequestHandle *stream_requests_head;
105
106   /**
107    * @brief Tail of the DLL of stream requests
108    */
109   struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
110 };
111
112
113 /**
114  * Handler for a single request from a client.
115  */
116 struct GNUNET_RPS_Request_Handle
117 {
118   /**
119    * The client issuing the request.
120    */
121   struct GNUNET_RPS_Handle *rps_handle;
122
123   /**
124    * The number of requested peers.
125    */
126   uint32_t num_requests;
127
128   /**
129    * @brief The Sampler for the client request
130    */
131   struct RPS_Sampler *sampler;
132
133   /**
134    * @brief Request handle of the request to the sampler - needed to cancel the request
135    */
136   struct RPS_SamplerRequestHandle *sampler_rh;
137
138   /**
139    * @brief Request handle of the request of the biased stream of peers -
140    * needed to cancel the request
141    */
142   struct GNUNET_RPS_StreamRequestHandle *srh;
143
144   /**
145    * The callback to be called when we receive an answer.
146    */
147   GNUNET_RPS_NotifyReadyCB ready_cb;
148
149   /**
150    * The closure for the callback.
151    */
152   void *ready_cb_cls;
153 };
154
155
156 /**
157  * Struct used to pack the callback, its closure (provided by the caller)
158  * and the connection handler to the service to pass it to a callback function.
159  */
160 struct cb_cls_pack
161 {
162   /**
163    * Callback provided by the client
164    */
165   GNUNET_RPS_NotifyReadyCB cb;
166
167   /**
168    * Closure provided by the client
169    */
170   void *cls;
171
172   /**
173    * Handle to the service connection
174    */
175  struct GNUNET_CLIENT_Connection *service_conn;
176 };
177
178
179 /**
180  * @brief Peers received from the biased stream to be passed to all
181  * srh_handlers
182  */
183 static struct GNUNET_PeerIdentity *srh_callback_peers;
184
185 /**
186  * @brief Number of peers in the biased stream that are to be passed to all
187  * srh_handlers
188  */
189 static uint64_t srh_callback_num_peers;
190
191
192 /**
193  * @brief Create a new handle for a stream request
194  *
195  * @param rps_handle The rps handle
196  * @param num_peers The number of desired peers
197  * @param ready_cb The callback to be called, once all peers are ready
198  * @param cls The colsure to provide to the callback
199  *
200  * @return The handle to the stream request
201  */
202 static struct GNUNET_RPS_StreamRequestHandle *
203 new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
204                     GNUNET_RPS_NotifyReadyCB ready_cb,
205                     void *cls)
206 {
207   struct GNUNET_RPS_StreamRequestHandle *srh;
208
209   srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
210   srh->rps_handle = rps_handle;
211   srh->ready_cb = ready_cb;
212   srh->ready_cb_cls = cls;
213   GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
214                                rps_handle->stream_requests_tail,
215                                srh);
216   
217   return srh;
218 }
219
220
221 /**
222  * @brief Remove the given stream request from the list of requests and memory
223  *
224  * @param srh The request to be removed
225  */
226 static void
227 remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh)
228 {
229   struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
230   
231   GNUNET_assert (NULL != srh);
232   if (NULL != srh->callback_task)
233   {
234     GNUNET_SCHEDULER_cancel (srh->callback_task);
235     srh->callback_task = NULL;
236   }
237   GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
238                                rps_handle->stream_requests_tail,
239                                srh);
240   GNUNET_free (srh);
241 }
242
243
244 /**
245  * @brief Called once the sampler has collected all requested peers.
246  *
247  * Calls the callback provided by the client with the corresponding cls.
248  *
249  * @param peers The array of @a num_peers that has been returned.
250  * @param num_peers The number of peers that have been returned
251  * @param cls The #GNUNET_RPS_Request_Handle
252  */
253 static void
254 peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
255                 uint32_t num_peers,
256                 void *cls)
257 {
258   struct GNUNET_RPS_Request_Handle *rh = cls;
259
260   rh->sampler_rh = NULL;
261   rh->ready_cb (rh->ready_cb_cls,
262                 num_peers,
263                 peers);
264   GNUNET_RPS_stream_cancel (rh->srh);
265   rh->srh = NULL;
266   RPS_sampler_destroy (rh->sampler);
267   rh->sampler = NULL;
268 }
269
270
271 /**
272  * @brief Callback to collect the peers from the biased stream and put those
273  * into the sampler.
274  *
275  * @param cls The #GNUNET_RPS_Request_Handle
276  * @param num_peers The number of peer that have been returned
277  * @param peers The array of @a num_peers that have been returned
278  */
279 static void
280 collect_peers_cb (void *cls,
281                   uint64_t num_peers,
282                   const struct GNUNET_PeerIdentity *peers)
283 {
284   struct GNUNET_RPS_Request_Handle *rh = cls;
285
286   LOG (GNUNET_ERROR_TYPE_DEBUG,
287        "Service sent %" PRIu64 " peers from stream\n",
288        num_peers);
289   for (uint64_t i = 0; i < num_peers; i++)
290   {
291     RPS_sampler_update (rh->sampler, &peers[i]);
292   }
293 }
294
295
296 /* Get internals for debugging/profiling purposes */
297
298 /**
299  * Request updates of view
300  *
301  * @param rps_handle handle to the rps service
302  * @param num_req_peers number of peers we want to receive
303  *        (0 for infinite updates)
304  * @param cls a closure that will be given to the callback
305  * @param ready_cb the callback called when the peers are available
306  */
307 void
308 GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
309                          uint32_t num_updates,
310                          GNUNET_RPS_NotifyReadyCB view_update_cb,
311                          void *cls)
312 {
313   struct GNUNET_MQ_Envelope *ev;
314   struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg;
315
316   LOG (GNUNET_ERROR_TYPE_DEBUG,
317        "Client requests %" PRIu32 " view updates\n",
318        num_updates);
319   rps_handle->view_update_cb = view_update_cb;
320   rps_handle->view_update_cls = cls;
321
322   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST);
323   msg->num_updates = htonl (num_updates);
324   GNUNET_MQ_send (rps_handle->mq, ev);
325 }
326
327
328 void
329 GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle)
330 {
331   struct GNUNET_MQ_Envelope *ev;
332
333   GNUNET_assert (NULL != rps_handle->view_update_cb);
334
335   rps_handle->view_update_cb = NULL;
336
337   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL);
338   GNUNET_MQ_send (rps_handle->mq, ev);
339 }
340
341
342 /**
343  * Request biased stream of peers that are being put into the sampler
344  *
345  * @param rps_handle handle to the rps service
346  * @param cls a closure that will be given to the callback
347  * @param ready_cb the callback called when the peers are available
348  */
349 struct GNUNET_RPS_StreamRequestHandle *
350 GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
351                            GNUNET_RPS_NotifyReadyCB stream_input_cb,
352                            void *cls)
353 {
354   struct GNUNET_RPS_StreamRequestHandle *srh;
355   struct GNUNET_MQ_Envelope *ev;
356   struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
357
358   srh = new_stream_request (rps_handle,
359                             stream_input_cb,
360                             cls);
361   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
362
363   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
364   GNUNET_MQ_send (rps_handle->mq, ev);
365   return srh;
366 }
367
368
369 /**
370  * This function is called, when the service updates the view.
371  * It verifies that @a msg is well-formed.
372  *
373  * @param cls the closure
374  * @param msg the message
375  * @return #GNUNET_OK if @a msg is well-formed
376  */
377 static int
378 check_view_update (void *cls,
379                    const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
380 {
381   uint16_t msize = ntohs (msg->header.size);
382   uint32_t num_peers = ntohl (msg->num_peers);
383   (void) cls;
384
385   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_ViewReply);
386   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
387        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
388   {
389     GNUNET_break (0);
390     return GNUNET_SYSERR;
391   }
392   return GNUNET_OK;
393 }
394
395
396 /**
397  * This function is called, when the service updated its view.
398  * It calls the callback the caller provided
399  * and disconnects afterwards.
400  *
401  * @param msg the message
402  */
403 static void
404 handle_view_update (void *cls,
405                     const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
406 {
407   struct GNUNET_RPS_Handle *h = cls;
408   struct GNUNET_PeerIdentity *peers;
409
410   /* Give the peers back */
411   LOG (GNUNET_ERROR_TYPE_DEBUG,
412        "New view of %" PRIu32 " peers:\n",
413        ntohl (msg->num_peers));
414
415   peers = (struct GNUNET_PeerIdentity *) &msg[1];
416   GNUNET_assert (NULL != h);
417   GNUNET_assert (NULL != h->view_update_cb);
418   h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers);
419 }
420
421
422 /**
423  * @brief Send message to service that this client does not want to receive
424  * further updates from the biased peer stream
425  *
426  * @param rps_handle The handle representing the service to the client
427  */
428 static void
429 cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
430 {
431   struct GNUNET_MQ_Envelope *ev;
432
433   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
434   GNUNET_MQ_send (rps_handle->mq, ev);
435 }
436
437
438 /**
439  * @brief Cancel a specific request for updates from the biased peer stream
440  *
441  * @param srh The request handle to cancel
442  */
443 void
444 GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
445 {
446   struct GNUNET_RPS_Handle *rps_handle;
447
448   rps_handle = srh->rps_handle;
449   remove_stream_request (srh);
450   if (NULL == rps_handle->stream_requests_head)
451     cancel_stream (rps_handle);
452 }
453
454
455 /**
456  * This function is called, when the service sends another peer from the biased
457  * stream.
458  * It calls the callback the caller provided
459  * and disconnects afterwards.
460  *
461  * TODO merge with check_view_update
462  *
463  * @param msg the message
464  */
465 static int
466 check_stream_input (void *cls,
467                     const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
468 {
469   uint16_t msize = ntohs (msg->header.size);
470   uint32_t num_peers = ntohl (msg->num_peers);
471   (void) cls;
472
473   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
474   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
475        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
476   {
477     GNUNET_break (0);
478     return GNUNET_SYSERR;
479   }
480   return GNUNET_OK;
481 }
482
483
484 /**
485  * @brief Called by the scheduler to call the callbacks of the srh handlers
486  *
487  * @param cls Stream request handle
488  */
489 static void
490 srh_callback_scheduled (void *cls)
491 {
492   struct GNUNET_RPS_StreamRequestHandle *srh = cls;
493
494   srh->callback_task = NULL;
495   srh->ready_cb (srh->ready_cb_cls,
496                  srh_callback_num_peers,
497                  srh_callback_peers);
498 }
499
500
501 /**
502  * This function is called, when the service sends another peer from the biased
503  * stream.
504  * It calls the callback the caller provided
505  * and disconnects afterwards.
506  *
507  * @param msg the message
508  */
509 static void
510 handle_stream_input (void *cls,
511                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
512 {
513   struct GNUNET_RPS_Handle *h = cls;
514   //const struct GNUNET_PeerIdentity *peers;
515   uint64_t num_peers;
516   struct GNUNET_RPS_StreamRequestHandle *srh_iter;
517   struct GNUNET_RPS_StreamRequestHandle *srh_next;
518
519   //peers = (struct GNUNET_PeerIdentity *) &msg[1];
520   num_peers = ntohl (msg->num_peers);
521   srh_callback_num_peers = num_peers;
522   GNUNET_free_non_null (srh_callback_peers);
523   srh_callback_peers = GNUNET_new_array (num_peers,
524                                          struct GNUNET_PeerIdentity);
525   GNUNET_memcpy (srh_callback_peers,
526                  &msg[1],
527                  num_peers * sizeof (struct GNUNET_PeerIdentity));
528   LOG (GNUNET_ERROR_TYPE_DEBUG,
529        "Received %" PRIu64 " peer(s) from stream input.\n",
530        num_peers);
531   for (srh_iter = h->stream_requests_head;
532        NULL != srh_iter;
533        srh_iter = srh_next)
534   {
535     LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
536     /* Store next pointer - srh might be removed/freed in callback */
537     srh_next = srh_iter->next;
538     if (NULL != srh_iter->callback_task)
539       GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
540     srh_iter->callback_task =
541       GNUNET_SCHEDULER_add_now (&srh_callback_scheduled,
542                                 srh_iter);
543   }
544
545   if (NULL == h->stream_requests_head)
546   {
547     cancel_stream (h);
548   }
549 }
550
551
552 /**
553  * Reconnect to the service
554  */
555 static void
556 reconnect (struct GNUNET_RPS_Handle *h);
557
558
559 /**
560  * Error handler for mq.
561  *
562  * This function is called whan mq encounters an error.
563  * Until now mq doesn't provide useful error messages.
564  *
565  * @param cls the closure
566  * @param error error code without specyfied meaning
567  */
568 static void
569 mq_error_handler (void *cls,
570                   enum GNUNET_MQ_Error error)
571 {
572   struct GNUNET_RPS_Handle *h = cls;
573   //TODO LOG
574   LOG (GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\
575        1: READ,\n\
576        2: WRITE,\n\
577        4: TIMEOUT\n",
578        // TODO: write GNUNET_MQ_strerror (error)
579        error);
580   reconnect (h);
581   /* Resend all pending request as the service destroyed its knowledge
582    * about them */
583 }
584
585
586 /**
587  * @brief Create the hash value from the share value that defines the sub
588  * (-group)
589  *
590  * @param share_val Share value
591  * @param hash[out] Pointer to the location in which the hash will be stored.
592  */
593 static void
594 hash_from_share_val (const char *share_val,
595                      struct GNUNET_HashCode *hash)
596 {
597   GNUNET_CRYPTO_kdf (hash,
598                      sizeof (struct GNUNET_HashCode),
599                      "rps",
600                      strlen ("rps"),
601                      share_val,
602                      strlen (share_val),
603                      NULL, 0);
604 }
605
606
607 /**
608  * Reconnect to the service
609  */
610 static void
611 reconnect (struct GNUNET_RPS_Handle *h)
612 {
613   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
614     GNUNET_MQ_hd_var_size (view_update,
615                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
616                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
617                            h),
618     GNUNET_MQ_hd_var_size (stream_input,
619                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
620                            struct GNUNET_RPS_CS_DEBUG_StreamReply,
621                            h),
622     GNUNET_MQ_handler_end ()
623   };
624
625   if (NULL != h->mq)
626     GNUNET_MQ_destroy (h->mq);
627   h->mq = GNUNET_CLIENT_connect (h->cfg,
628                                  "rps",
629                                  mq_handlers,
630                                  &mq_error_handler,
631                                  h);
632 }
633
634
635 /**
636  * Connect to the rps service
637  *
638  * @param cfg configuration to use
639  * @return a handle to the service
640  */
641 struct GNUNET_RPS_Handle *
642 GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
643 {
644   struct GNUNET_RPS_Handle *h;
645
646   h = GNUNET_new (struct GNUNET_RPS_Handle);
647   h->cfg = cfg;
648   reconnect (h);
649   if (NULL == h->mq)
650   {
651     GNUNET_free (h);
652     return NULL;
653   }
654   return h;
655 }
656
657
658 /**
659  * @brief Start a sub with the given shared value
660  *
661  * @param h Handle to rps
662  * @param shared_value The shared value that defines the members of the sub (-gorup)
663  */
664 void
665 GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
666                       const char *shared_value)
667 {
668   struct GNUNET_RPS_CS_SubStartMessage *msg;
669   struct GNUNET_MQ_Envelope *ev;
670
671   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
672   hash_from_share_val (shared_value, &msg->hash);
673   msg->round_interval = GNUNET_TIME_relative_hton (// TODO read from config!
674     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30));
675   GNUNET_assert (0 != msg->round_interval.rel_value_us__);
676
677   GNUNET_MQ_send (h->mq, ev);
678 }
679
680
681 /**
682  * @brief Stop a sub with the given shared value
683  *
684  * @param h Handle to rps
685  * @param shared_value The shared value that defines the members of the sub (-gorup)
686  */
687 void
688 GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
689                      const char *shared_value)
690 {
691   struct GNUNET_RPS_CS_SubStopMessage *msg;
692   struct GNUNET_MQ_Envelope *ev;
693
694   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
695   hash_from_share_val (shared_value, &msg->hash);
696
697   GNUNET_MQ_send (h->mq, ev);
698 }
699
700
701 /**
702  * Request n random peers.
703  *
704  * @param rps_handle handle to the rps service
705  * @param num_req_peers number of peers we want to receive
706  * @param ready_cb the callback called when the peers are available
707  * @param cls closure given to the callback
708  * @return a handle to cancel this request
709  */
710 struct GNUNET_RPS_Request_Handle *
711 GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
712                           uint32_t num_req_peers,
713                           GNUNET_RPS_NotifyReadyCB ready_cb,
714                           void *cls)
715 {
716   struct GNUNET_RPS_Request_Handle *rh;
717
718   LOG (GNUNET_ERROR_TYPE_INFO,
719        "Client requested %" PRIu32 " peers\n",
720        num_req_peers);
721   rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
722   rh->rps_handle = rps_handle;
723   rh->num_requests = num_req_peers;
724   rh->sampler = RPS_sampler_mod_init (num_req_peers,
725                                       GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
726   rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
727                                                  num_req_peers,
728                                                  peers_ready_cb,
729                                                  rh);
730   rh->srh = GNUNET_RPS_stream_request (rps_handle,
731                                        collect_peers_cb,
732                                        rh); /* cls */
733   rh->ready_cb = ready_cb;
734   rh->ready_cb_cls = cls;
735
736   return rh;
737 }
738
739
740 /**
741  * Seed rps service with peerIDs.
742  *
743  * @param h handle to the rps service
744  * @param n number of peers to seed
745  * @param ids the ids of the peers seeded
746  */
747 void
748 GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
749                      uint32_t n,
750                      const struct GNUNET_PeerIdentity *ids)
751 {
752   size_t size_needed;
753   uint32_t num_peers_max;
754   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
755   struct GNUNET_MQ_Envelope *ev;
756   struct GNUNET_RPS_CS_SeedMessage *msg;
757
758   LOG (GNUNET_ERROR_TYPE_DEBUG,
759        "Client wants to seed %" PRIu32 " peers:\n",
760        n);
761   for (unsigned int i = 0 ; i < n ; i++)
762     LOG (GNUNET_ERROR_TYPE_DEBUG,
763          "%u. peer: %s\n",
764          i,
765          GNUNET_i2s (&ids[i]));
766
767   /* The actual size the message occupies */
768   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
769     n * sizeof (struct GNUNET_PeerIdentity);
770   /* The number of peers that fits in one message together with
771    * the respective header */
772   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
773       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
774     sizeof (struct GNUNET_PeerIdentity);
775   tmp_peer_pointer = ids;
776
777   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
778   {
779     ev = GNUNET_MQ_msg_extra (msg,
780                               num_peers_max * sizeof (struct GNUNET_PeerIdentity),
781                               GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
782     msg->num_peers = htonl (num_peers_max);
783     GNUNET_memcpy (&msg[1],
784                    tmp_peer_pointer,
785                    num_peers_max * sizeof (struct GNUNET_PeerIdentity));
786     GNUNET_MQ_send (h->mq,
787                     ev);
788     n -= num_peers_max;
789     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
790                   n * sizeof (struct GNUNET_PeerIdentity);
791     /* Set pointer to beginning of next block of num_peers_max peers */
792     tmp_peer_pointer = &ids[num_peers_max];
793   }
794
795   ev = GNUNET_MQ_msg_extra (msg,
796                             n * sizeof (struct GNUNET_PeerIdentity),
797                             GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
798   msg->num_peers = htonl (n);
799   GNUNET_memcpy (&msg[1],
800                  tmp_peer_pointer,
801                  n * sizeof (struct GNUNET_PeerIdentity));
802   GNUNET_MQ_send (h->mq,
803                   ev);
804 }
805
806
807 #ifdef ENABLE_MALICIOUS
808 /**
809  * Turn RPS service to act malicious.
810  *
811  * @param h handle to the rps service
812  * @param type which type of malicious peer to turn to.
813  *             0 Don't act malicious at all
814  *             1 Try to maximise representation
815  *             2 Try to partition the network
816  *               (isolate one peer from the rest)
817  * @param n number of @a ids
818  * @param ids the ids of the malicious peers
819  *            if @type is 2 the last id is the id of the
820  *            peer to be isolated from the rest
821  */
822 void
823 GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
824                           uint32_t type,
825                           uint32_t num_peers,
826                           const struct GNUNET_PeerIdentity *peer_ids,
827                           const struct GNUNET_PeerIdentity *target_peer)
828 {
829   size_t size_needed;
830   uint32_t num_peers_max;
831   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
832   struct GNUNET_MQ_Envelope *ev;
833   struct GNUNET_RPS_CS_ActMaliciousMessage *msg;
834
835   unsigned int i;
836
837   LOG (GNUNET_ERROR_TYPE_DEBUG,
838        "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n",
839        type,
840        num_peers);
841   for (i = 0 ; i < num_peers ; i++)
842     LOG (GNUNET_ERROR_TYPE_DEBUG,
843          "%u. peer: %s\n",
844          i,
845          GNUNET_i2s (&peer_ids[i]));
846
847   /* The actual size the message would occupy */
848   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
849     num_peers * sizeof (struct GNUNET_PeerIdentity);
850   /* The number of peers that fit in one message together with
851    * the respective header */
852   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
853       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
854     sizeof (struct GNUNET_PeerIdentity);
855   tmp_peer_pointer = peer_ids;
856
857   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
858   {
859     LOG (GNUNET_ERROR_TYPE_DEBUG,
860          "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n",
861          num_peers_max);
862     ev = GNUNET_MQ_msg_extra (msg,
863                               num_peers_max * sizeof (struct GNUNET_PeerIdentity),
864                               GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
865     msg->type = htonl (type);
866     msg->num_peers = htonl (num_peers_max);
867     if ( (2 == type) ||
868          (3 == type) )
869       msg->attacked_peer = peer_ids[num_peers];
870     GNUNET_memcpy (&msg[1],
871             tmp_peer_pointer,
872             num_peers_max * sizeof (struct GNUNET_PeerIdentity));
873
874     GNUNET_MQ_send (h->mq, ev);
875
876     num_peers -= num_peers_max;
877     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
878                   num_peers * sizeof (struct GNUNET_PeerIdentity);
879     /* Set pointer to beginning of next block of num_peers_max peers */
880     tmp_peer_pointer = &peer_ids[num_peers_max];
881   }
882
883   ev = GNUNET_MQ_msg_extra (msg,
884                             num_peers * sizeof (struct GNUNET_PeerIdentity),
885                             GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
886   msg->type = htonl (type);
887   msg->num_peers = htonl (num_peers);
888   if ( (2 == type) ||
889        (3 == type) )
890     msg->attacked_peer = *target_peer;
891   GNUNET_memcpy (&msg[1],
892                  tmp_peer_pointer,
893                  num_peers * sizeof (struct GNUNET_PeerIdentity));
894
895   GNUNET_MQ_send (h->mq, ev);
896 }
897 #endif /* ENABLE_MALICIOUS */
898
899
900 /**
901  * Cancle an issued request.
902  *
903  * @param rh request handle of request to cancle
904  */
905 void
906 GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
907 {
908   struct GNUNET_RPS_Handle *h;
909
910   h = rh->rps_handle;
911   GNUNET_assert (NULL != rh);
912   GNUNET_assert (h == rh->srh->rps_handle);
913   GNUNET_RPS_stream_cancel (rh->srh);
914   rh->srh = NULL;
915   if (NULL == h->stream_requests_head) cancel_stream(h);
916   if (NULL != rh->sampler_rh)
917   {
918     RPS_sampler_request_cancel (rh->sampler_rh);
919   }
920   RPS_sampler_destroy (rh->sampler);
921   GNUNET_free (rh);
922 }
923
924
925 /**
926  * Disconnect from the rps service
927  *
928  * @param h the handle to the rps service
929  */
930 void
931 GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
932 {
933   if (NULL != h->stream_requests_head)
934   {
935     struct GNUNET_RPS_StreamRequestHandle *srh_next;
936
937     LOG (GNUNET_ERROR_TYPE_WARNING,
938         "Still waiting for replies\n");
939     for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
940          NULL != srh_iter;
941          srh_iter = srh_next)
942     {
943       srh_next = srh_iter->next;
944       GNUNET_RPS_stream_cancel (srh_iter);
945     }
946   }
947   if (NULL != srh_callback_peers)
948   {
949     GNUNET_free (srh_callback_peers);
950     srh_callback_peers = NULL;
951   }
952   if (NULL != h->view_update_cb)
953   {
954     LOG (GNUNET_ERROR_TYPE_WARNING,
955         "Still waiting for view updates\n");
956     GNUNET_RPS_view_request_cancel (h);
957   }
958   GNUNET_MQ_destroy (h->mq);
959   GNUNET_free (h);
960 }
961
962
963 /* end of rps_api.c */