Start changing architecture of rps service/client
[oweals/gnunet.git] / src / rps / rps_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/rps_api.c
21  * @brief API for rps
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "rps.h"
27 #include "gnunet_rps_service.h"
28 #include "gnunet-service-rps_sampler.h"
29
30 #include <inttypes.h>
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
33
34 /**
35  * Handler to handle requests from a client.
36  */
37 struct GNUNET_RPS_Handle
38 {
39   /**
40    * The handle to the client configuration.
41    */
42   const struct GNUNET_CONFIGURATION_Handle *cfg;
43
44   /**
45    * The message queue to the client.
46    */
47   struct GNUNET_MQ_Handle *mq;
48
49   /**
50    * Array of Request_Handles.
51    */
52   struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers;
53
54   /**
55    * The id of the last request.
56    */
57   uint32_t current_request_id;
58
59   /**
60    * @brief Callback called on each update of the view
61    */
62   GNUNET_RPS_NotifyReadyCB view_update_cb;
63
64   /**
65    * @brief Closure to each requested update of the view
66    */
67   void *view_update_cls;
68
69   /**
70    * @brief Callback called on each peer of the biased input stream
71    */
72   GNUNET_RPS_NotifyReadyCB stream_input_cb;
73
74   /**
75    * @brief Closure to each requested peer from the biased stream
76    */
77   void *stream_input_cls;
78 };
79
80
81 /**
82  * Handler for a single request from a client.
83  */
84 struct GNUNET_RPS_Request_Handle
85 {
86   /**
87    * The client issuing the request.
88    */
89   struct GNUNET_RPS_Handle *rps_handle;
90
91   /**
92    * The id of the request.
93    */
94   uint32_t id;
95
96   /**
97    * The number of requested peers.
98    */
99   uint32_t num_requests;
100
101   /**
102    * @brief The Sampler for the client request
103    */
104   struct RPS_Sampler *sampler;
105
106   /**
107    * The callback to be called when we receive an answer.
108    */
109   GNUNET_RPS_NotifyReadyCB ready_cb;
110
111   /**
112    * The closure for the callback.
113    */
114   void *ready_cb_cls;
115 };
116
117
118 /**
119  * Struct used to pack the callback, its closure (provided by the caller)
120  * and the connection handler to the service to pass it to a callback function.
121  */
122 struct cb_cls_pack
123 {
124   /**
125    * Callback provided by the client
126    */
127   GNUNET_RPS_NotifyReadyCB cb;
128
129   /**
130    * Closure provided by the client
131    */
132   void *cls;
133
134   /**
135    * Handle to the service connection
136    */
137  struct GNUNET_CLIENT_Connection *service_conn;
138 };
139
140
141 /**
142  * @brief Send a request to the service.
143  *
144  * @param h rps handle
145  * @param id id of the request
146  * @param num_req_peers number of peers
147  */
148 void
149 send_request (const struct GNUNET_RPS_Handle *h,
150               uint32_t id,
151               uint32_t num_req_peers)
152 {
153   struct GNUNET_MQ_Envelope *ev;
154   struct GNUNET_RPS_CS_RequestMessage *msg;
155
156   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST);
157   msg->num_peers = htonl (num_req_peers);
158   msg->id = htonl (id);
159   GNUNET_MQ_send (h->mq, ev);
160 }
161
162 /**
163  * @brief Iterator function over pending requests
164  *
165  * Implements #GNUNET_CONTAINER_HashMapIterator32
166  *
167  * @param cls rps handle
168  * @param key id of the request
169  * @param value request handle
170  *
171  * @return GNUNET_YES to continue iteration
172  */
173 int
174 resend_requests_iterator (void *cls, uint32_t key, void *value)
175 {
176   const struct GNUNET_RPS_Handle *h = cls;
177   const struct GNUNET_RPS_Request_Handle *req_handle = value;
178   (void) key;
179
180   send_request (h, req_handle->id, req_handle->num_requests);
181   return GNUNET_YES; /* continue iterating */
182 }
183
184 /**
185  * @brief Resend all pending requests
186  *
187  * This is used to resend all pending requests after the client
188  * reconnected to the service, because the service cancels all
189  * pending requests after reconnection.
190  *
191  * @param h rps handle
192  */
193 void
194 resend_requests (struct GNUNET_RPS_Handle *h)
195 {
196   GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers,
197                                            resend_requests_iterator,
198                                            h);
199 }
200
201
202 /**
203  * This function is called, when the service replies to our request.
204  * It verifies that @a msg is well-formed.
205  *
206  * @param cls the closure
207  * @param msg the message
208  * @return #GNUNET_OK if @a msg is well-formed
209  */
210 static int
211 check_reply (void *cls,
212              const struct GNUNET_RPS_CS_ReplyMessage *msg)
213 {
214   uint16_t msize = ntohs (msg->header.size);
215   uint32_t num_peers = ntohl (msg->num_peers);
216   (void) cls;
217
218   msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage);
219   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
220        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
221   {
222     GNUNET_break (0);
223     return GNUNET_SYSERR;
224   }
225   return GNUNET_OK;
226 }
227
228
229 /**
230  * This function is called, when the service replies to our request.
231  * It calls the callback the caller gave us with the provided closure
232  * and disconnects afterwards.
233  *
234  * @param cls the closure
235  * @param msg the message
236  */
237 static void
238 handle_reply (void *cls,
239               const struct GNUNET_RPS_CS_ReplyMessage *msg)
240 {
241   struct GNUNET_RPS_Handle *h = cls;
242   struct GNUNET_PeerIdentity *peers;
243   struct GNUNET_RPS_Request_Handle *rh;
244   uint32_t id;
245
246   /* Give the peers back */
247   id = ntohl (msg->id);
248   LOG (GNUNET_ERROR_TYPE_DEBUG,
249        "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n",
250        ntohl (msg->num_peers),
251        id);
252
253   peers = (struct GNUNET_PeerIdentity *) &msg[1];
254   GNUNET_assert (GNUNET_YES ==
255       GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id));
256   rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id);
257   GNUNET_assert (NULL != rh);
258   GNUNET_assert (rh->num_requests == ntohl (msg->num_peers));
259   GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id);
260   rh->ready_cb (rh->ready_cb_cls,
261                 ntohl (msg->num_peers),
262                 peers);
263 }
264
265
266 /* Get internals for debugging/profiling purposes */
267
268 /**
269  * Request updates of view
270  *
271  * @param rps_handle handle to the rps service
272  * @param num_req_peers number of peers we want to receive
273  *        (0 for infinite updates)
274  * @param cls a closure that will be given to the callback
275  * @param ready_cb the callback called when the peers are available
276  */
277 void
278 GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
279                          uint32_t num_updates,
280                          GNUNET_RPS_NotifyReadyCB view_update_cb,
281                          void *cls)
282 {
283   struct GNUNET_MQ_Envelope *ev;
284   struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg;
285
286   LOG (GNUNET_ERROR_TYPE_DEBUG,
287        "Client requests %" PRIu32 " view updates\n",
288        num_updates);
289   rps_handle->view_update_cb = view_update_cb;
290   rps_handle->view_update_cls = cls;
291
292   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST);
293   msg->num_updates = htonl (num_updates);
294   GNUNET_MQ_send (rps_handle->mq, ev);
295 }
296
297
298 /**
299  * Request biased stream of peers that are being put into the sampler
300  *
301  * @param rps_handle handle to the rps service
302  * @param num_req_peers number of peers we want to receive
303  *        (0 for infinite updates)
304  * @param cls a closure that will be given to the callback
305  * @param ready_cb the callback called when the peers are available
306  */
307 void
308 GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
309                            uint32_t num_peers,
310                            GNUNET_RPS_NotifyReadyCB stream_input_cb,
311                            void *cls)
312 {
313   struct GNUNET_MQ_Envelope *ev;
314   struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
315
316   LOG (GNUNET_ERROR_TYPE_DEBUG,
317        "Client requests %" PRIu32 " biased stream updates\n",
318        num_peers);
319   rps_handle->stream_input_cb = stream_input_cb;
320   rps_handle->stream_input_cls = cls;
321
322   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
323   msg->num_peers = htonl (num_peers);
324   GNUNET_MQ_send (rps_handle->mq, ev);
325 }
326
327
328 /**
329  * This function is called, when the service updates the view.
330  * It verifies that @a msg is well-formed.
331  *
332  * @param cls the closure
333  * @param msg the message
334  * @return #GNUNET_OK if @a msg is well-formed
335  */
336 static int
337 check_view_update (void *cls,
338                    const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
339 {
340   uint16_t msize = ntohs (msg->header.size);
341   uint32_t num_peers = ntohl (msg->num_peers);
342   (void) cls;
343
344   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_ViewReply);
345   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
346        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
347   {
348     GNUNET_break (0);
349     return GNUNET_SYSERR;
350   }
351   return GNUNET_OK;
352 }
353
354
355 /**
356  * This function is called, when the service updated its view.
357  * It calls the callback the caller provided
358  * and disconnects afterwards.
359  *
360  * @param msg the message
361  */
362 static void
363 handle_view_update (void *cls,
364                     const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
365 {
366   struct GNUNET_RPS_Handle *h = cls;
367   struct GNUNET_PeerIdentity *peers;
368
369   /* Give the peers back */
370   LOG (GNUNET_ERROR_TYPE_DEBUG,
371        "New view of %" PRIu32 " peers:\n",
372        ntohl (msg->num_peers));
373
374   peers = (struct GNUNET_PeerIdentity *) &msg[1];
375   GNUNET_assert (NULL != h);
376   GNUNET_assert (NULL != h->view_update_cb);
377   h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers);
378 }
379
380
381 /**
382  * This function is called, when the service sends another peer from the biased
383  * stream.
384  * It calls the callback the caller provided
385  * and disconnects afterwards.
386  *
387  * TODO merge with check_view_update
388  *
389  * @param msg the message
390  */
391 static int
392 check_stream_input (void *cls,
393                     const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
394 {
395   uint16_t msize = ntohs (msg->header.size);
396   uint32_t num_peers = ntohl (msg->num_peers);
397   (void) cls;
398
399   msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
400   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
401        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
402   {
403     GNUNET_break (0);
404     return GNUNET_SYSERR;
405   }
406   return GNUNET_OK;
407 }
408
409 /**
410  * This function is called, when the service sends another peer from the biased
411  * stream.
412  * It calls the callback the caller provided
413  * and disconnects afterwards.
414  *
415  * @param msg the message
416  */
417 static void
418 handle_stream_input (void *cls,
419                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
420 {
421   struct GNUNET_RPS_Handle *h = cls;
422   const struct GNUNET_PeerIdentity *peers;
423
424   /* Give the peers back */
425   LOG (GNUNET_ERROR_TYPE_DEBUG,
426        "New peer of %" PRIu64 " biased input stream\n",
427        ntohl (msg->num_peers));
428
429   peers = (struct GNUNET_PeerIdentity *) &msg[1];
430   GNUNET_assert (NULL != h);
431   GNUNET_assert (NULL != h->stream_input_cb);
432   h->stream_input_cb (h->stream_input_cls, ntohl (msg->num_peers), peers);
433 }
434
435
436 /**
437  * Reconnect to the service
438  */
439 static void
440 reconnect (struct GNUNET_RPS_Handle *h);
441
442
443 /**
444  * Error handler for mq.
445  *
446  * This function is called whan mq encounters an error.
447  * Until now mq doesn't provide useful error messages.
448  *
449  * @param cls the closure
450  * @param error error code without specyfied meaning
451  */
452 static void
453 mq_error_handler (void *cls,
454                   enum GNUNET_MQ_Error error)
455 {
456   struct GNUNET_RPS_Handle *h = cls;
457   //TODO LOG
458   LOG (GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\
459        1: READ,\n\
460        2: WRITE,\n\
461        4: TIMEOUT\n",
462        error);
463   reconnect (h);
464   /* Resend all pending request as the service destroyed its knowledge
465    * about them */
466   resend_requests (h);
467 }
468
469
470 /**
471  * Reconnect to the service
472  */
473 static void
474 reconnect (struct GNUNET_RPS_Handle *h)
475 {
476   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
477     GNUNET_MQ_hd_var_size (reply,
478                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY,
479                            struct GNUNET_RPS_CS_ReplyMessage,
480                            h),
481     GNUNET_MQ_hd_var_size (view_update,
482                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
483                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
484                            h),
485     GNUNET_MQ_hd_var_size (stream_input,
486                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
487                            struct GNUNET_RPS_CS_DEBUG_StreamReply,
488                            h),
489     GNUNET_MQ_handler_end ()
490   };
491
492   if (NULL != h->mq)
493     GNUNET_MQ_destroy (h->mq);
494   h->mq = GNUNET_CLIENT_connect (h->cfg,
495                                  "rps",
496                                  mq_handlers,
497                                  &mq_error_handler,
498                                  h);
499 }
500
501
502 /**
503  * Connect to the rps service
504  *
505  * @param cfg configuration to use
506  * @return a handle to the service
507  */
508 struct GNUNET_RPS_Handle *
509 GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
510 {
511   struct GNUNET_RPS_Handle *h;
512
513   h = GNUNET_new (struct GNUNET_RPS_Handle);
514   h->current_request_id = 0;
515   h->cfg = cfg;
516   reconnect (h);
517   if (NULL == h->mq)
518   {
519     GNUNET_free (h);
520     return NULL;
521   }
522   h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4);
523   return h;
524 }
525
526
527 /**
528  * @brief Create new request handle
529  *
530  * @param rps_handle Handle to the service
531  * @param num_requests Number of requests
532  * @param ready_cb Callback
533  * @param cls Closure
534  *
535  * @return The newly created request handle
536  */
537 static struct GNUNET_RPS_Request_Handle *
538 new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
539                     uint64_t num_requests,
540                     struct RPS_Sampler *sampler,
541                     GNUNET_RPS_NotifyReadyCB ready_cb,
542                     void *cls)
543 {
544   struct GNUNET_RPS_Request_Handle *rh;
545
546   rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
547   rh->rps_handle = rps_handle;
548   rh->id = rps_handle->current_request_id++;
549   rh->num_requests = num_requests;
550   rh->sampler = sampler;
551   rh->ready_cb = ready_cb;
552   rh->ready_cb_cls = cls;
553   GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
554       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
555
556   return rh;
557 }
558
559
560 /**
561  * Request n random peers.
562  *
563  * @param rps_handle handle to the rps service
564  * @param num_req_peers number of peers we want to receive
565  * @param ready_cb the callback called when the peers are available
566  * @param cls closure given to the callback
567  * @return a handle to cancel this request
568  */
569 struct GNUNET_RPS_Request_Handle *
570 GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle,
571                           uint32_t num_req_peers,
572                           GNUNET_RPS_NotifyReadyCB ready_cb,
573                           void *cls)
574 {
575   struct GNUNET_RPS_Request_Handle *rh;
576
577   rh = new_request_handle (rps_handle,
578                            num_req_peers,
579                            NULL, /* no sampler needed */
580                            ready_cb,
581                            cls);
582
583   LOG (GNUNET_ERROR_TYPE_DEBUG,
584        "Requesting %" PRIu32 " peers with id %" PRIu32 "\n",
585        num_req_peers,
586        rh->id);
587
588   send_request (rps_handle, rh->id, num_req_peers);
589   return rh;
590 }
591
592
593 /**
594  * @brief Callback to collect the peers from the biased stream and put those
595  * into the sampler.
596  *
597  * @param cls The #GNUNET_RPS_Request_Handle
598  * @param num_peers The number of peer that have been returned
599  * @param peers The array of @a num_peers that have been returned
600  */
601 void
602 collect_peers_cb (void *cls,
603                   uint64_t num_peers,
604                   const struct GNUNET_PeerIdentity *peers)
605 {
606   struct GNUNET_RPS_Request_Handle *rh = cls;
607
608   for (uint64_t i = 0; i < num_peers; i++)
609   {
610     RPS_sampler_update (rh->sampler, &peers[i]);
611   }
612 }
613
614
615 /**
616  * @brief Called once the sampler has collected all requested peers.
617  *
618  * Calls the callback provided by the client with the corresponding cls.
619  *
620  * @param peers The array of @a num_peers that has been returned.
621  * @param num_peers The number of peers that have been returned
622  * @param cls The #GNUNET_RPS_Request_Handle
623  */
624 void
625 peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
626                 uint32_t num_peers,
627                 void *cls)
628 {
629   struct GNUNET_RPS_Request_Handle *rh = cls;
630
631   rh->ready_cb (rh->ready_cb_cls,
632                 num_peers,
633                 peers);
634   // TODO cleanup, sampler, rh, cancel stuff
635   // TODO screw this function. We can give the cb,cls directly to the sampler.
636 }
637
638 /**
639  * Request n random peers.
640  *
641  * @param rps_handle handle to the rps service
642  * @param num_req_peers number of peers we want to receive
643  * @param ready_cb the callback called when the peers are available
644  * @param cls closure given to the callback
645  * @return a handle to cancel this request
646  */
647 struct GNUNET_RPS_Request_Handle *
648 GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
649                             uint32_t num_req_peers,
650                             GNUNET_RPS_NotifyReadyCB ready_cb,
651                             void *cls)
652 {
653   struct GNUNET_RPS_Request_Handle *rh;
654
655   rh = new_request_handle (rps_handle,
656                            num_req_peers,
657                            RPS_sampler_mod_init (num_req_peers,
658                                                  GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff
659                            ready_cb,
660                            cls);
661   RPS_sampler_get_n_rand_peers (rh->sampler,
662                                 num_req_peers,
663                                 peers_ready_cb,
664                                 rh);
665
666   GNUNET_RPS_stream_request (rps_handle,
667                              0, /* infinite updates */
668                              collect_peers_cb,
669                              rh); /* cls */
670
671   return rh;
672 }
673
674
675 /**
676  * Seed rps service with peerIDs.
677  *
678  * @param h handle to the rps service
679  * @param n number of peers to seed
680  * @param ids the ids of the peers seeded
681  */
682 void
683 GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
684                      uint32_t n,
685                      const struct GNUNET_PeerIdentity *ids)
686 {
687   size_t size_needed;
688   uint32_t num_peers_max;
689   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
690   struct GNUNET_MQ_Envelope *ev;
691   struct GNUNET_RPS_CS_SeedMessage *msg;
692
693   unsigned int i;
694
695   LOG (GNUNET_ERROR_TYPE_DEBUG,
696        "Client wants to seed %" PRIu32 " peers:\n",
697        n);
698   for (i = 0 ; i < n ; i++)
699     LOG (GNUNET_ERROR_TYPE_DEBUG,
700          "%u. peer: %s\n",
701          i,
702          GNUNET_i2s (&ids[i]));
703
704   /* The actual size the message occupies */
705   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
706     n * sizeof (struct GNUNET_PeerIdentity);
707   /* The number of peers that fits in one message together with
708    * the respective header */
709   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
710       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
711     sizeof (struct GNUNET_PeerIdentity);
712   tmp_peer_pointer = ids;
713
714   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
715   {
716     ev = GNUNET_MQ_msg_extra (msg, num_peers_max * sizeof (struct GNUNET_PeerIdentity),
717         GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
718     msg->num_peers = htonl (num_peers_max);
719     GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers_max * sizeof (struct GNUNET_PeerIdentity));
720     GNUNET_MQ_send (h->mq, ev);
721
722     n -= num_peers_max;
723     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
724                   n * sizeof (struct GNUNET_PeerIdentity);
725     /* Set pointer to beginning of next block of num_peers_max peers */
726     tmp_peer_pointer = &ids[num_peers_max];
727   }
728
729   ev = GNUNET_MQ_msg_extra (msg, n * sizeof (struct GNUNET_PeerIdentity),
730                             GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
731   msg->num_peers = htonl (n);
732   GNUNET_memcpy (&msg[1], tmp_peer_pointer, n * sizeof (struct GNUNET_PeerIdentity));
733
734   GNUNET_MQ_send (h->mq, ev);
735 }
736
737
738 #ifdef ENABLE_MALICIOUS
739 /**
740  * Turn RPS service to act malicious.
741  *
742  * @param h handle to the rps service
743  * @param type which type of malicious peer to turn to.
744  *             0 Don't act malicious at all
745  *             1 Try to maximise representation
746  *             2 Try to partition the network
747  *               (isolate one peer from the rest)
748  * @param n number of @a ids
749  * @param ids the ids of the malicious peers
750  *            if @type is 2 the last id is the id of the
751  *            peer to be isolated from the rest
752  */
753 void
754 GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
755                           uint32_t type,
756                           uint32_t num_peers,
757                           const struct GNUNET_PeerIdentity *peer_ids,
758                           const struct GNUNET_PeerIdentity *target_peer)
759 {
760   size_t size_needed;
761   uint32_t num_peers_max;
762   const struct GNUNET_PeerIdentity *tmp_peer_pointer;
763   struct GNUNET_MQ_Envelope *ev;
764   struct GNUNET_RPS_CS_ActMaliciousMessage *msg;
765
766   unsigned int i;
767
768   LOG (GNUNET_ERROR_TYPE_DEBUG,
769        "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n",
770        type,
771        num_peers);
772   for (i = 0 ; i < num_peers ; i++)
773     LOG (GNUNET_ERROR_TYPE_DEBUG,
774          "%u. peer: %s\n",
775          i,
776          GNUNET_i2s (&peer_ids[i]));
777
778   /* The actual size the message would occupy */
779   size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
780     num_peers * sizeof (struct GNUNET_PeerIdentity);
781   /* The number of peers that fit in one message together with
782    * the respective header */
783   num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
784       sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
785     sizeof (struct GNUNET_PeerIdentity);
786   tmp_peer_pointer = peer_ids;
787
788   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
789   {
790     LOG (GNUNET_ERROR_TYPE_DEBUG,
791          "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n",
792          num_peers_max);
793     ev = GNUNET_MQ_msg_extra (msg,
794                               num_peers_max * sizeof (struct GNUNET_PeerIdentity),
795                               GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
796     msg->type = htonl (type);
797     msg->num_peers = htonl (num_peers_max);
798     if ( (2 == type) ||
799          (3 == type) )
800       msg->attacked_peer = peer_ids[num_peers];
801     GNUNET_memcpy (&msg[1],
802             tmp_peer_pointer,
803             num_peers_max * sizeof (struct GNUNET_PeerIdentity));
804
805     GNUNET_MQ_send (h->mq, ev);
806
807     num_peers -= num_peers_max;
808     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
809                   num_peers * sizeof (struct GNUNET_PeerIdentity);
810     /* Set pointer to beginning of next block of num_peers_max peers */
811     tmp_peer_pointer = &peer_ids[num_peers_max];
812   }
813
814   ev = GNUNET_MQ_msg_extra (msg,
815                             num_peers * sizeof (struct GNUNET_PeerIdentity),
816                             GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
817   msg->type = htonl (type);
818   msg->num_peers = htonl (num_peers);
819   if ( (2 == type) ||
820        (3 == type) )
821     msg->attacked_peer = *target_peer;
822   GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct GNUNET_PeerIdentity));
823
824   GNUNET_MQ_send (h->mq, ev);
825 }
826 #endif /* ENABLE_MALICIOUS */
827
828
829 /**
830  * Cancle an issued request.
831  *
832  * @param rh request handle of request to cancle
833  */
834 void
835 GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
836 {
837   struct GNUNET_RPS_Handle *h;
838   struct GNUNET_MQ_Envelope *ev;
839   struct GNUNET_RPS_CS_RequestCancelMessage*msg;
840
841   LOG (GNUNET_ERROR_TYPE_DEBUG,
842        "Cancelling request with id %" PRIu32 "\n",
843        rh->id);
844
845   h = rh->rps_handle;
846   GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers,
847         rh->id));
848   GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id);
849   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL);
850   msg->id = htonl (rh->id);
851   GNUNET_MQ_send (rh->rps_handle->mq, ev);
852 }
853
854
855 /**
856  * Disconnect from the rps service
857  *
858  * @param h the handle to the rps service
859  */
860 void
861 GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
862 {
863   GNUNET_MQ_destroy (h->mq);
864   if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers))
865     LOG (GNUNET_ERROR_TYPE_WARNING,
866         "Still waiting for requests\n");
867   GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers);
868   GNUNET_free (h);
869 }
870
871
872 /* end of rps_api.c */