glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14 */
15 /**
16  * @file set/gnunet-service-set.c
17  * @brief two-peer set operations
18  * @author Florian Dold
19  * @author Christian Grothoff
20  */
21 #include "gnunet-service-set.h"
22 #include "gnunet-service-set_union.h"
23 #include "gnunet-service-set_intersection.h"
24 #include "gnunet-service-set_protocol.h"
25 #include "gnunet_statistics_service.h"
26
27 /**
28  * How long do we hold on to an incoming channel if there is
29  * no local listener before giving up?
30  */
31 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
32
33
34 /**
35  * Lazy copy requests made by a client.
36  */
37 struct LazyCopyRequest
38 {
39   /**
40    * Kept in a DLL.
41    */
42   struct LazyCopyRequest *prev;
43
44   /**
45    * Kept in a DLL.
46    */
47   struct LazyCopyRequest *next;
48
49   /**
50    * Which set are we supposed to copy?
51    */
52   struct Set *source_set;
53
54   /**
55    * Cookie identifying the request.
56    */
57   uint32_t cookie;
58
59 };
60
61
62 /**
63  * A listener is inhabited by a client, and waits for evaluation
64  * requests from remote peers.
65  */
66 struct Listener
67 {
68   /**
69    * Listeners are held in a doubly linked list.
70    */
71   struct Listener *next;
72
73   /**
74    * Listeners are held in a doubly linked list.
75    */
76   struct Listener *prev;
77
78   /**
79    * Head of DLL of operations this listener is responsible for.
80    * Once the client has accepted/declined the operation, the
81    * operation is moved to the respective set's operation DLLS.
82    */
83   struct Operation *op_head;
84
85   /**
86    * Tail of DLL of operations this listener is responsible for.
87    * Once the client has accepted/declined the operation, the
88    * operation is moved to the respective set's operation DLLS.
89    */
90   struct Operation *op_tail;
91
92   /**
93    * Client that owns the listener.
94    * Only one client may own a listener.
95    */
96   struct ClientState *cs;
97
98   /**
99    * The port we are listening on with CADET.
100    */
101   struct GNUNET_CADET_Port *open_port;
102
103   /**
104    * Application ID for the operation, used to distinguish
105    * multiple operations of the same type with the same peer.
106    */
107   struct GNUNET_HashCode app_id;
108
109   /**
110    * The type of the operation.
111    */
112   enum GNUNET_SET_OperationType operation;
113 };
114
115
116 /**
117  * Handle to the cadet service, used to listen for and connect to
118  * remote peers.
119  */
120 static struct GNUNET_CADET_Handle *cadet;
121
122 /**
123  * DLL of lazy copy requests by this client.
124  */
125 static struct LazyCopyRequest *lazy_copy_head;
126
127 /**
128  * DLL of lazy copy requests by this client.
129  */
130 static struct LazyCopyRequest *lazy_copy_tail;
131
132 /**
133  * Generator for unique cookie we set per lazy copy request.
134  */
135 static uint32_t lazy_copy_cookie;
136
137 /**
138  * Statistics handle.
139  */
140 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
141
142 /**
143  * Listeners are held in a doubly linked list.
144  */
145 static struct Listener *listener_head;
146
147 /**
148  * Listeners are held in a doubly linked list.
149  */
150 static struct Listener *listener_tail;
151
152 /**
153  * Number of active clients.
154  */
155 static unsigned int num_clients;
156
157 /**
158  * Are we in shutdown? if #GNUNET_YES and the number of clients
159  * drops to zero, disconnect from CADET.
160  */
161 static int in_shutdown;
162
163 /**
164  * Counter for allocating unique IDs for clients, used to identify
165  * incoming operation requests from remote peers, that the client can
166  * choose to accept or refuse.  0 must not be used (reserved for
167  * uninitialized).
168  */
169 static uint32_t suggest_id;
170
171
172 /**
173  * Get the incoming socket associated with the given id.
174  *
175  * @param listener the listener to look in
176  * @param id id to look for
177  * @return the incoming socket associated with the id,
178  *         or NULL if there is none
179  */
180 static struct Operation *
181 get_incoming (uint32_t id)
182 {
183   for (struct Listener *listener = listener_head;
184        NULL != listener;
185        listener = listener->next)
186   {
187     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
188       if (op->suggest_id == id)
189         return op;
190   }
191   return NULL;
192 }
193
194
195 /**
196  * Destroy an incoming request from a remote peer
197  *
198  * @param op remote request to destroy
199  */
200 static void
201 incoming_destroy (struct Operation *op)
202 {
203   struct Listener *listener;
204   struct GNUNET_CADET_Channel *channel;
205
206   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
207               "Destroying incoming operation %p\n",
208               op);
209   if (NULL != (listener = op->listener))
210   {
211     GNUNET_CONTAINER_DLL_remove (listener->op_head,
212                                  listener->op_tail,
213                                  op);
214     op->listener = NULL;
215   }
216   if (NULL != op->timeout_task)
217   {
218     GNUNET_SCHEDULER_cancel (op->timeout_task);
219     op->timeout_task = NULL;
220   }
221   if (NULL != (channel = op->channel))
222   {
223     op->channel = NULL;
224     GNUNET_CADET_channel_destroy (channel);
225   }
226 }
227
228
229 /**
230  * Context for the #garbage_collect_cb().
231  */
232 struct GarbageContext
233 {
234
235   /**
236    * Map for which we are garbage collecting removed elements.
237    */
238   struct GNUNET_CONTAINER_MultiHashMap *map;
239
240   /**
241    * Lowest generation for which an operation is still pending.
242    */
243   unsigned int min_op_generation;
244
245   /**
246    * Largest generation for which an operation is still pending.
247    */
248   unsigned int max_op_generation;
249
250 };
251
252
253 /**
254  * Function invoked to check if an element can be removed from
255  * the set's history because it is no longer needed.
256  *
257  * @param cls the `struct GarbageContext *`
258  * @param key key of the element in the map
259  * @param value the `struct ElementEntry *`
260  * @return #GNUNET_OK (continue to iterate)
261  */
262 static int
263 garbage_collect_cb (void *cls,
264                     const struct GNUNET_HashCode *key,
265                     void *value)
266 {
267   //struct GarbageContext *gc = cls;
268   //struct ElementEntry *ee = value;
269
270   //if (GNUNET_YES != ee->removed)
271   //  return GNUNET_OK;
272   //if ( (gc->max_op_generation < ee->generation_added) ||
273   //     (ee->generation_removed > gc->min_op_generation) )
274   //{
275   //  GNUNET_assert (GNUNET_YES ==
276   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
277   //                                                       key,
278   //                                                       ee));
279   //  GNUNET_free (ee);
280   //}
281   return GNUNET_OK;
282 }
283
284
285 /**
286  * Collect and destroy elements that are not needed anymore, because
287  * their lifetime (as determined by their generation) does not overlap
288  * with any active set operation.
289  *
290  * @param set set to garbage collect
291  */
292 static void
293 collect_generation_garbage (struct Set *set)
294 {
295   struct GarbageContext gc;
296
297   gc.min_op_generation = UINT_MAX;
298   gc.max_op_generation = 0;
299   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
300   {
301     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
302                                        op->generation_created);
303     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
304                                        op->generation_created);
305   }
306   gc.map = set->content->elements;
307   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
308                                          &garbage_collect_cb,
309                                          &gc);
310 }
311
312
313 /**
314  * Is @a generation in the range of exclusions?
315  *
316  * @param generation generation to query
317  * @param excluded array of generations where the element is excluded
318  * @param excluded_size length of the @a excluded array
319  * @return #GNUNET_YES if @a generation is in any of the ranges
320  */
321 static int
322 is_excluded_generation (unsigned int generation,
323                         struct GenerationRange *excluded,
324                         unsigned int excluded_size)
325 {
326   for (unsigned int i = 0; i < excluded_size; i++)
327     if ( (generation >= excluded[i].start) &&
328          (generation < excluded[i].end) )
329       return GNUNET_YES;
330   return GNUNET_NO;
331 }
332
333
334 /**
335  * Is element @a ee part of the set during @a query_generation?
336  *
337  * @param ee element to test
338  * @param query_generation generation to query
339  * @param excluded array of generations where the element is excluded
340  * @param excluded_size length of the @a excluded array
341  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
342  */
343 static int
344 is_element_of_generation (struct ElementEntry *ee,
345                           unsigned int query_generation,
346                           struct GenerationRange *excluded,
347                           unsigned int excluded_size)
348 {
349   struct MutationEvent *mut;
350   int is_present;
351
352   GNUNET_assert (NULL != ee->mutations);
353   if (GNUNET_YES ==
354       is_excluded_generation (query_generation,
355                               excluded,
356                               excluded_size))
357   {
358     GNUNET_break (0);
359     return GNUNET_NO;
360   }
361
362   is_present = GNUNET_NO;
363
364   /* Could be made faster with binary search, but lists
365      are small, so why bother. */
366   for (unsigned int i = 0; i < ee->mutations_size; i++)
367   {
368     mut = &ee->mutations[i];
369
370     if (mut->generation > query_generation)
371     {
372       /* The mutation doesn't apply to our generation
373          anymore.  We can'b break here, since mutations aren't
374          sorted by generation. */
375       continue;
376     }
377
378     if (GNUNET_YES ==
379         is_excluded_generation (mut->generation,
380                                 excluded,
381                                 excluded_size))
382     {
383       /* The generation is excluded (because it belongs to another
384          fork via a lazy copy) and thus mutations aren't considered
385          for membership testing. */
386       continue;
387     }
388
389     /* This would be an inconsistency in how we manage mutations. */
390     if ( (GNUNET_YES == is_present) &&
391          (GNUNET_YES == mut->added) )
392       GNUNET_assert (0);
393     /* Likewise. */
394     if ( (GNUNET_NO == is_present) &&
395          (GNUNET_NO == mut->added) )
396       GNUNET_assert (0);
397
398     is_present = mut->added;
399   }
400
401   return is_present;
402 }
403
404
405 /**
406  * Is element @a ee part of the set used by @a op?
407  *
408  * @param ee element to test
409  * @param op operation the defines the set and its generation
410  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
411  */
412 int
413 _GSS_is_element_of_operation (struct ElementEntry *ee,
414                               struct Operation *op)
415 {
416   return is_element_of_generation (ee,
417                                    op->generation_created,
418                                    op->set->excluded_generations,
419                                    op->set->excluded_generations_size);
420 }
421
422
423 /**
424  * Destroy the given operation.  Used for any operation where both
425  * peers were known and that thus actually had a vt and channel.  Must
426  * not be used for operations where 'listener' is still set and we do
427  * not know the other peer.
428  *
429  * Call the implementation-specific cancel function of the operation.
430  * Disconnects from the remote peer.  Does not disconnect the client,
431  * as there may be multiple operations per set.
432  *
433  * @param op operation to destroy
434  * @param gc #GNUNET_YES to perform garbage collection on the set
435  */
436 void
437 _GSS_operation_destroy (struct Operation *op,
438                         int gc)
439 {
440   struct Set *set = op->set;
441   struct GNUNET_CADET_Channel *channel;
442
443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
444               "Destroying operation %p\n",
445               op);
446   GNUNET_assert (NULL == op->listener);
447   if (NULL != op->state)
448   {
449     set->vt->cancel (op);
450     op->state = NULL;
451   }
452   if (NULL != set)
453   {
454     GNUNET_CONTAINER_DLL_remove (set->ops_head,
455                                  set->ops_tail,
456                                  op);
457     op->set = NULL;
458   }
459   if (NULL != op->context_msg)
460   {
461     GNUNET_free (op->context_msg);
462     op->context_msg = NULL;
463   }
464   if (NULL != (channel = op->channel))
465   {
466     /* This will free op; called conditionally as this helper function
467        is also called from within the channel disconnect handler. */
468     op->channel = NULL;
469     GNUNET_CADET_channel_destroy (channel);
470   }
471   if ( (NULL != set) &&
472        (GNUNET_YES == gc) )
473     collect_generation_garbage (set);
474   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
475    * there was a channel end handler that will free 'op' on the call stack. */
476 }
477
478
479 /**
480  * Callback called when a client connects to the service.
481  *
482  * @param cls closure for the service
483  * @param c the new client that connected to the service
484  * @param mq the message queue used to send messages to the client
485  * @return @a `struct ClientState`
486  */
487 static void *
488 client_connect_cb (void *cls,
489                    struct GNUNET_SERVICE_Client *c,
490                    struct GNUNET_MQ_Handle *mq)
491 {
492   struct ClientState *cs;
493
494   num_clients++;
495   cs = GNUNET_new (struct ClientState);
496   cs->client = c;
497   cs->mq = mq;
498   return cs;
499 }
500
501
502 /**
503  * Iterator over hash map entries to free element entries.
504  *
505  * @param cls closure
506  * @param key current key code
507  * @param value a `struct ElementEntry *` to be free'd
508  * @return #GNUNET_YES (continue to iterate)
509  */
510 static int
511 destroy_elements_iterator (void *cls,
512                            const struct GNUNET_HashCode *key,
513                            void *value)
514 {
515   struct ElementEntry *ee = value;
516
517   GNUNET_free_non_null (ee->mutations);
518   GNUNET_free (ee);
519   return GNUNET_YES;
520 }
521
522
523 /**
524  * Clean up after a client has disconnected
525  *
526  * @param cls closure, unused
527  * @param client the client to clean up after
528  * @param internal_cls the `struct ClientState`
529  */
530 static void
531 client_disconnect_cb (void *cls,
532                       struct GNUNET_SERVICE_Client *client,
533                       void *internal_cls)
534 {
535   struct ClientState *cs = internal_cls;
536   struct Operation *op;
537   struct Listener *listener;
538   struct Set *set;
539
540   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
541               "Client disconnected, cleaning up\n");
542   if (NULL != (set = cs->set))
543   {
544     struct SetContent *content = set->content;
545     struct PendingMutation *pm;
546     struct PendingMutation *pm_current;
547     struct LazyCopyRequest *lcr;
548
549     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550                 "Destroying client's set\n");
551     /* Destroy pending set operations */
552     while (NULL != set->ops_head)
553       _GSS_operation_destroy (set->ops_head,
554                               GNUNET_NO);
555
556     /* Destroy operation-specific state */
557     GNUNET_assert (NULL != set->state);
558     set->vt->destroy_set (set->state);
559     set->state = NULL;
560
561     /* Clean up ongoing iterations */
562     if (NULL != set->iter)
563     {
564       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
565       set->iter = NULL;
566       set->iteration_id++;
567     }
568
569     /* discard any pending mutations that reference this set */
570     pm = content->pending_mutations_head;
571     while (NULL != pm)
572     {
573       pm_current = pm;
574       pm = pm->next;
575       if (pm_current->set == set)
576       {
577         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
578                                      content->pending_mutations_tail,
579                                      pm_current);
580         GNUNET_free (pm_current);
581       }
582     }
583
584     /* free set content (or at least decrement RC) */
585     set->content = NULL;
586     GNUNET_assert (0 != content->refcount);
587     content->refcount--;
588     if (0 == content->refcount)
589     {
590       GNUNET_assert (NULL != content->elements);
591       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
592                                              &destroy_elements_iterator,
593                                              NULL);
594       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
595       content->elements = NULL;
596       GNUNET_free (content);
597     }
598     GNUNET_free_non_null (set->excluded_generations);
599     set->excluded_generations = NULL;
600
601     /* remove set from pending copy requests */
602     lcr = lazy_copy_head;
603     while (NULL != lcr)
604     {
605       struct LazyCopyRequest *lcr_current = lcr;
606
607       lcr = lcr->next;
608       if (lcr_current->source_set == set)
609       {
610         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
611                                      lazy_copy_tail,
612                                      lcr_current);
613         GNUNET_free (lcr_current);
614       }
615     }
616     GNUNET_free (set);
617   }
618
619   if (NULL != (listener = cs->listener))
620   {
621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622                 "Destroying client's listener\n");
623     GNUNET_CADET_close_port (listener->open_port);
624     listener->open_port = NULL;
625     while (NULL != (op = listener->op_head))
626     {
627       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
628                   "Destroying incoming operation `%u' from peer `%s'\n",
629                   (unsigned int) op->client_request_id,
630                   GNUNET_i2s (&op->peer));
631       incoming_destroy (op);
632     }
633     GNUNET_CONTAINER_DLL_remove (listener_head,
634                                  listener_tail,
635                                  listener);
636     GNUNET_free (listener);
637   }
638   GNUNET_free (cs);
639   num_clients--;
640   if ( (GNUNET_YES == in_shutdown) &&
641        (0 == num_clients) )
642   {
643     if (NULL != cadet)
644     {
645       GNUNET_CADET_disconnect (cadet);
646       cadet = NULL;
647     }
648   }
649 }
650
651
652 /**
653  * Check a request for a set operation from another peer.
654  *
655  * @param cls the operation state
656  * @param msg the received message
657  * @return #GNUNET_OK if the channel should be kept alive,
658  *         #GNUNET_SYSERR to destroy the channel
659  */
660 static int
661 check_incoming_msg (void *cls,
662                     const struct OperationRequestMessage *msg)
663 {
664   struct Operation *op = cls;
665   struct Listener *listener = op->listener;
666   const struct GNUNET_MessageHeader *nested_context;
667
668   /* double operation request */
669   if (0 != op->suggest_id)
670   {
671     GNUNET_break_op (0);
672     return GNUNET_SYSERR;
673   }
674   /* This should be equivalent to the previous condition, but can't hurt to check twice */
675   if (NULL == op->listener)
676   {
677     GNUNET_break (0);
678     return GNUNET_SYSERR;
679   }
680   if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
681   {
682     GNUNET_break_op (0);
683     return GNUNET_SYSERR;
684   }
685   nested_context = GNUNET_MQ_extract_nested_mh (msg);
686   if ( (NULL != nested_context) &&
687        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
688   {
689     GNUNET_break_op (0);
690     return GNUNET_SYSERR;
691   }
692   return GNUNET_OK;
693 }
694
695
696 /**
697  * Handle a request for a set operation from another peer.  Checks if we
698  * have a listener waiting for such a request (and in that case initiates
699  * asking the listener about accepting the connection). If no listener
700  * is waiting, we queue the operation request in hope that a listener
701  * shows up soon (before timeout).
702  *
703  * This msg is expected as the first and only msg handled through the
704  * non-operation bound virtual table, acceptance of this operation replaces
705  * our virtual table and subsequent msgs would be routed differently (as
706  * we then know what type of operation this is).
707  *
708  * @param cls the operation state
709  * @param msg the received message
710  * @return #GNUNET_OK if the channel should be kept alive,
711  *         #GNUNET_SYSERR to destroy the channel
712  */
713 static void
714 handle_incoming_msg (void *cls,
715                      const struct OperationRequestMessage *msg)
716 {
717   struct Operation *op = cls;
718   struct Listener *listener = op->listener;
719   const struct GNUNET_MessageHeader *nested_context;
720   struct GNUNET_MQ_Envelope *env;
721   struct GNUNET_SET_RequestMessage *cmsg;
722
723   nested_context = GNUNET_MQ_extract_nested_mh (msg);
724   /* Make a copy of the nested_context (application-specific context
725      information that is opaque to set) so we can pass it to the
726      listener later on */
727   if (NULL != nested_context)
728     op->context_msg = GNUNET_copy_message (nested_context);
729   op->remote_element_count = ntohl (msg->element_count);
730   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
731               "Received P2P operation request (op %u, port %s) for active listener\n",
732               (uint32_t) ntohl (msg->operation),
733               GNUNET_h2s (&op->listener->app_id));
734   GNUNET_assert (0 == op->suggest_id);
735   if (0 == suggest_id)
736     suggest_id++;
737   op->suggest_id = suggest_id++;
738   GNUNET_assert (NULL != op->timeout_task);
739   GNUNET_SCHEDULER_cancel (op->timeout_task);
740   op->timeout_task = NULL;
741   env = GNUNET_MQ_msg_nested_mh (cmsg,
742                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
743                                  op->context_msg);
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745               "Suggesting incoming request with accept id %u to listener %p of client %p\n",
746               op->suggest_id,
747               listener,
748               listener->cs);
749   cmsg->accept_id = htonl (op->suggest_id);
750   cmsg->peer_id = op->peer;
751   GNUNET_MQ_send (listener->cs->mq,
752                   env);
753   /* NOTE: GNUNET_CADET_receive_done() will be called in
754      #handle_client_accept() */
755 }
756
757
758 /**
759  * Add an element to @a set as specified by @a msg
760  *
761  * @param set set to manipulate
762  * @param msg message specifying the change
763  */
764 static void
765 execute_add (struct Set *set,
766              const struct GNUNET_SET_ElementMessage *msg)
767 {
768   struct GNUNET_SET_Element el;
769   struct ElementEntry *ee;
770   struct GNUNET_HashCode hash;
771
772   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
773   el.size = ntohs (msg->header.size) - sizeof (*msg);
774   el.data = &msg[1];
775   el.element_type = ntohs (msg->element_type);
776   GNUNET_SET_element_hash (&el,
777                            &hash);
778   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
779                                           &hash);
780   if (NULL == ee)
781   {
782     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783                 "Client inserts element %s of size %u\n",
784                 GNUNET_h2s (&hash),
785                 el.size);
786     ee = GNUNET_malloc (el.size + sizeof (*ee));
787     ee->element.size = el.size;
788     GNUNET_memcpy (&ee[1],
789             el.data,
790             el.size);
791     ee->element.data = &ee[1];
792     ee->element.element_type = el.element_type;
793     ee->remote = GNUNET_NO;
794     ee->mutations = NULL;
795     ee->mutations_size = 0;
796     ee->element_hash = hash;
797     GNUNET_break (GNUNET_YES ==
798                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
799                                                      &ee->element_hash,
800                                                      ee,
801                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
802   }
803   else if (GNUNET_YES ==
804            is_element_of_generation (ee,
805                                      set->current_generation,
806                                      set->excluded_generations,
807                                      set->excluded_generations_size))
808   {
809     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810                 "Client inserted element %s of size %u twice (ignored)\n",
811                 GNUNET_h2s (&hash),
812                 el.size);
813
814     /* same element inserted twice */
815     return;
816   }
817
818   {
819     struct MutationEvent mut = {
820       .generation = set->current_generation,
821       .added = GNUNET_YES
822     };
823     GNUNET_array_append (ee->mutations,
824                          ee->mutations_size,
825                          mut);
826   }
827   set->vt->add (set->state,
828                 ee);
829 }
830
831
832 /**
833  * Remove an element from @a set as specified by @a msg
834  *
835  * @param set set to manipulate
836  * @param msg message specifying the change
837  */
838 static void
839 execute_remove (struct Set *set,
840                 const struct GNUNET_SET_ElementMessage *msg)
841 {
842   struct GNUNET_SET_Element el;
843   struct ElementEntry *ee;
844   struct GNUNET_HashCode hash;
845
846   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
847   el.size = ntohs (msg->header.size) - sizeof (*msg);
848   el.data = &msg[1];
849   el.element_type = ntohs (msg->element_type);
850   GNUNET_SET_element_hash (&el, &hash);
851   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
852                                           &hash);
853   if (NULL == ee)
854   {
855     /* Client tried to remove non-existing element. */
856     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857                 "Client removes non-existing element of size %u\n",
858                 el.size);
859     return;
860   }
861   if (GNUNET_NO ==
862       is_element_of_generation (ee,
863                                 set->current_generation,
864                                 set->excluded_generations,
865                                 set->excluded_generations_size))
866   {
867     /* Client tried to remove element twice */
868     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869                 "Client removed element of size %u twice (ignored)\n",
870                 el.size);
871     return;
872   }
873   else
874   {
875     struct MutationEvent mut = {
876       .generation = set->current_generation,
877       .added = GNUNET_NO
878     };
879
880     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881                 "Client removes element of size %u\n",
882                 el.size);
883
884     GNUNET_array_append (ee->mutations,
885                          ee->mutations_size,
886                          mut);
887   }
888   set->vt->remove (set->state,
889                    ee);
890 }
891
892
893 /**
894  * Perform a mutation on a set as specified by the @a msg
895  *
896  * @param set the set to mutate
897  * @param msg specification of what to change
898  */
899 static void
900 execute_mutation (struct Set *set,
901                   const struct GNUNET_SET_ElementMessage *msg)
902 {
903   switch (ntohs (msg->header.type))
904   {
905     case GNUNET_MESSAGE_TYPE_SET_ADD:
906       execute_add (set, msg);
907       break;
908     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
909       execute_remove (set, msg);
910       break;
911     default:
912       GNUNET_break (0);
913   }
914 }
915
916
917 /**
918  * Execute mutations that were delayed on a set because of
919  * pending operations.
920  *
921  * @param set the set to execute mutations on
922  */
923 static void
924 execute_delayed_mutations (struct Set *set)
925 {
926   struct PendingMutation *pm;
927
928   if (0 != set->content->iterator_count)
929     return; /* still cannot do this */
930   while (NULL != (pm = set->content->pending_mutations_head))
931   {
932     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
933                                  set->content->pending_mutations_tail,
934                                  pm);
935     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936                 "Executing pending mutation on %p.\n",
937                 pm->set);
938     execute_mutation (pm->set,
939                       pm->msg);
940     GNUNET_free (pm->msg);
941     GNUNET_free (pm);
942   }
943 }
944
945
946 /**
947  * Send the next element of a set to the set's client.  The next element is given by
948  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
949  * are no more elements in the set.  The caller must ensure that the set's iterator is
950  * valid.
951  *
952  * The client will acknowledge each received element with a
953  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
954  * #handle_client_iter_ack() will then trigger the next transmission.
955  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
956  *
957  * @param set set that should send its next element to its client
958  */
959 static void
960 send_client_element (struct Set *set)
961 {
962   int ret;
963   struct ElementEntry *ee;
964   struct GNUNET_MQ_Envelope *ev;
965   struct GNUNET_SET_IterResponseMessage *msg;
966
967   GNUNET_assert (NULL != set->iter);
968   do {
969     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
970                                                        NULL,
971                                                        (const void **) &ee);
972     if (GNUNET_NO == ret)
973     {
974       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                   "Iteration on %p done.\n",
976                   set);
977       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
978       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
979       set->iter = NULL;
980       set->iteration_id++;
981       GNUNET_assert (set->content->iterator_count > 0);
982       set->content->iterator_count--;
983       execute_delayed_mutations (set);
984       GNUNET_MQ_send (set->cs->mq,
985                       ev);
986       return;
987     }
988     GNUNET_assert (NULL != ee);
989   } while (GNUNET_NO ==
990            is_element_of_generation (ee,
991                                      set->iter_generation,
992                                      set->excluded_generations,
993                                      set->excluded_generations_size));
994   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995               "Sending iteration element on %p.\n",
996               set);
997   ev = GNUNET_MQ_msg_extra (msg,
998                             ee->element.size,
999                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1000   GNUNET_memcpy (&msg[1],
1001                  ee->element.data,
1002                  ee->element.size);
1003   msg->element_type = htons (ee->element.element_type);
1004   msg->iteration_id = htons (set->iteration_id);
1005   GNUNET_MQ_send (set->cs->mq,
1006                   ev);
1007 }
1008
1009
1010 /**
1011  * Called when a client wants to iterate the elements of a set.
1012  * Checks if we have a set associated with the client and if we
1013  * can right now start an iteration. If all checks out, starts
1014  * sending the elements of the set to the client.
1015  *
1016  * @param cls client that sent the message
1017  * @param m message sent by the client
1018  */
1019 static void
1020 handle_client_iterate (void *cls,
1021                        const struct GNUNET_MessageHeader *m)
1022 {
1023   struct ClientState *cs = cls;
1024   struct Set *set;
1025
1026   if (NULL == (set = cs->set))
1027   {
1028     /* attempt to iterate over a non existing set */
1029     GNUNET_break (0);
1030     GNUNET_SERVICE_client_drop (cs->client);
1031     return;
1032   }
1033   if (NULL != set->iter)
1034   {
1035     /* Only one concurrent iterate-action allowed per set */
1036     GNUNET_break (0);
1037     GNUNET_SERVICE_client_drop (cs->client);
1038     return;
1039   }
1040   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041               "Iterating set %p in gen %u with %u content elements\n",
1042               (void *) set,
1043               set->current_generation,
1044               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1045   GNUNET_SERVICE_client_continue (cs->client);
1046   set->content->iterator_count++;
1047   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1048   set->iter_generation = set->current_generation;
1049   send_client_element (set);
1050 }
1051
1052
1053 /**
1054  * Called when a client wants to create a new set.  This is typically
1055  * the first request from a client, and includes the type of set
1056  * operation to be performed.
1057  *
1058  * @param cls client that sent the message
1059  * @param m message sent by the client
1060  */
1061 static void
1062 handle_client_create_set (void *cls,
1063                           const struct GNUNET_SET_CreateMessage *msg)
1064 {
1065   struct ClientState *cs = cls;
1066   struct Set *set;
1067
1068   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069               "Client created new set (operation %u)\n",
1070               (uint32_t) ntohl (msg->operation));
1071   if (NULL != cs->set)
1072   {
1073     /* There can only be one set per client */
1074     GNUNET_break (0);
1075     GNUNET_SERVICE_client_drop (cs->client);
1076     return;
1077   }
1078   set = GNUNET_new (struct Set);
1079   switch (ntohl (msg->operation))
1080   {
1081   case GNUNET_SET_OPERATION_INTERSECTION:
1082     set->vt = _GSS_intersection_vt ();
1083     break;
1084   case GNUNET_SET_OPERATION_UNION:
1085     set->vt = _GSS_union_vt ();
1086     break;
1087   default:
1088     GNUNET_free (set);
1089     GNUNET_break (0);
1090     GNUNET_SERVICE_client_drop (cs->client);
1091     return;
1092   }
1093   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1094   set->state = set->vt->create ();
1095   if (NULL == set->state)
1096   {
1097     /* initialization failed (i.e. out of memory) */
1098     GNUNET_free (set);
1099     GNUNET_SERVICE_client_drop (cs->client);
1100     return;
1101   }
1102   set->content = GNUNET_new (struct SetContent);
1103   set->content->refcount = 1;
1104   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1105                                                                  GNUNET_YES);
1106   set->cs = cs;
1107   cs->set = set;
1108   GNUNET_SERVICE_client_continue (cs->client);
1109 }
1110
1111
1112 /**
1113  * Timeout happens iff:
1114  *  - we suggested an operation to our listener,
1115  *    but did not receive a response in time
1116  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1117  *
1118  * @param cls channel context
1119  * @param tc context information (why was this task triggered now)
1120  */
1121 static void
1122 incoming_timeout_cb (void *cls)
1123 {
1124   struct Operation *op = cls;
1125
1126   op->timeout_task = NULL;
1127   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1128               "Remote peer's incoming request timed out\n");
1129   incoming_destroy (op);
1130 }
1131
1132
1133 /**
1134  * Method called whenever another peer has added us to a channel the
1135  * other peer initiated.  Only called (once) upon reception of data
1136  * from a channel we listen on.
1137  *
1138  * The channel context represents the operation itself and gets added
1139  * to a DLL, from where it gets looked up when our local listener
1140  * client responds to a proposed/suggested operation or connects and
1141  * associates with this operation.
1142  *
1143  * @param cls closure
1144  * @param channel new handle to the channel
1145  * @param source peer that started the channel
1146  * @return initial channel context for the channel
1147  *         returns NULL on error
1148  */
1149 static void *
1150 channel_new_cb (void *cls,
1151                 struct GNUNET_CADET_Channel *channel,
1152                 const struct GNUNET_PeerIdentity *source)
1153 {
1154   struct Listener *listener = cls;
1155   struct Operation *op;
1156
1157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158               "New incoming channel\n");
1159   op = GNUNET_new (struct Operation);
1160   op->listener = listener;
1161   op->peer = *source;
1162   op->channel = channel;
1163   op->mq = GNUNET_CADET_get_mq (op->channel);
1164   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1165                                        UINT32_MAX);
1166   op->timeout_task
1167     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1168                                     &incoming_timeout_cb,
1169                                     op);
1170   GNUNET_CONTAINER_DLL_insert (listener->op_head,
1171                                listener->op_tail,
1172                                op);
1173   return op;
1174 }
1175
1176
1177 /**
1178  * Function called whenever a channel is destroyed.  Should clean up
1179  * any associated state.  It must NOT call
1180  * GNUNET_CADET_channel_destroy() on the channel.
1181  *
1182  * The peer_disconnect function is part of a a virtual table set initially either
1183  * when a peer creates a new channel with us, or once we create
1184  * a new channel ourselves (evaluate).
1185  *
1186  * Once we know the exact type of operation (union/intersection), the vt is
1187  * replaced with an operation specific instance (_GSS_[op]_vt).
1188  *
1189  * @param channel_ctx place where local state associated
1190  *                   with the channel is stored
1191  * @param channel connection to the other end (henceforth invalid)
1192  */
1193 static void
1194 channel_end_cb (void *channel_ctx,
1195                 const struct GNUNET_CADET_Channel *channel)
1196 {
1197   struct Operation *op = channel_ctx;
1198
1199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200               "channel_end_cb called\n");
1201   op->channel = NULL;
1202   if (NULL != op->listener)
1203     incoming_destroy (op);
1204   else if (NULL != op->set)
1205     op->set->vt->channel_death (op);
1206   else
1207     _GSS_operation_destroy (op,
1208                             GNUNET_YES);
1209   GNUNET_free (op);
1210 }
1211
1212
1213 /**
1214  * Function called whenever an MQ-channel's transmission window size changes.
1215  *
1216  * The first callback in an outgoing channel will be with a non-zero value
1217  * and will mean the channel is connected to the destination.
1218  *
1219  * For an incoming channel it will be called immediately after the
1220  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1221  *
1222  * @param cls Channel closure.
1223  * @param channel Connection to the other end (henceforth invalid).
1224  * @param window_size New window size. If the is more messages than buffer size
1225  *                    this value will be negative..
1226  */
1227 static void
1228 channel_window_cb (void *cls,
1229                    const struct GNUNET_CADET_Channel *channel,
1230                    int window_size)
1231 {
1232   /* FIXME: not implemented, we could do flow control here... */
1233 }
1234
1235
1236 /**
1237  * Called when a client wants to create a new listener.
1238  *
1239  * @param cls client that sent the message
1240  * @param msg message sent by the client
1241  */
1242 static void
1243 handle_client_listen (void *cls,
1244                       const struct GNUNET_SET_ListenMessage *msg)
1245 {
1246   struct ClientState *cs = cls;
1247   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1248     GNUNET_MQ_hd_var_size (incoming_msg,
1249                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1250                            struct OperationRequestMessage,
1251                            NULL),
1252     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1253                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1254                            struct IBFMessage,
1255                            NULL),
1256     GNUNET_MQ_hd_var_size (union_p2p_elements,
1257                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1258                            struct GNUNET_SET_ElementMessage,
1259                            NULL),
1260     GNUNET_MQ_hd_var_size (union_p2p_offer,
1261                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1262                            struct GNUNET_MessageHeader,
1263                            NULL),
1264     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1265                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1266                            struct InquiryMessage,
1267                            NULL),
1268     GNUNET_MQ_hd_var_size (union_p2p_demand,
1269                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1270                            struct GNUNET_MessageHeader,
1271                            NULL),
1272     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1273                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1274                              struct GNUNET_MessageHeader,
1275                              NULL),
1276     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1277                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1278                              struct GNUNET_MessageHeader,
1279                              NULL),
1280     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1281                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1282                              struct GNUNET_MessageHeader,
1283                              NULL),
1284     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1285                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1286                              struct GNUNET_MessageHeader,
1287                              NULL),
1288     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1289                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1290                            struct StrataEstimatorMessage,
1291                            NULL),
1292     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1293                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1294                            struct StrataEstimatorMessage,
1295                            NULL),
1296     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1297                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1298                            struct GNUNET_SET_ElementMessage,
1299                            NULL),
1300     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1301                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1302                              struct IntersectionElementInfoMessage,
1303                              NULL),
1304     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1305                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1306                            struct BFMessage,
1307                            NULL),
1308     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1309                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1310                              struct IntersectionDoneMessage,
1311                              NULL),
1312     GNUNET_MQ_handler_end ()
1313   };
1314   struct Listener *listener;
1315
1316   if (NULL != cs->listener)
1317   {
1318     /* max. one active listener per client! */
1319     GNUNET_break (0);
1320     GNUNET_SERVICE_client_drop (cs->client);
1321     return;
1322   }
1323   listener = GNUNET_new (struct Listener);
1324   listener->cs = cs;
1325   cs->listener = listener;
1326   listener->app_id = msg->app_id;
1327   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1328   GNUNET_CONTAINER_DLL_insert (listener_head,
1329                                listener_tail,
1330                                listener);
1331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332               "New listener created (op %u, port %s)\n",
1333               listener->operation,
1334               GNUNET_h2s (&listener->app_id));
1335   listener->open_port
1336     = GNUNET_CADET_open_port (cadet,
1337                               &msg->app_id,
1338                               &channel_new_cb,
1339                               listener,
1340                               &channel_window_cb,
1341                               &channel_end_cb,
1342                               cadet_handlers);
1343   GNUNET_SERVICE_client_continue (cs->client);
1344 }
1345
1346
1347 /**
1348  * Called when the listening client rejects an operation
1349  * request by another peer.
1350  *
1351  * @param cls client that sent the message
1352  * @param msg message sent by the client
1353  */
1354 static void
1355 handle_client_reject (void *cls,
1356                       const struct GNUNET_SET_RejectMessage *msg)
1357 {
1358   struct ClientState *cs = cls;
1359   struct Operation *op;
1360
1361   op = get_incoming (ntohl (msg->accept_reject_id));
1362   if (NULL == op)
1363   {
1364     /* no matching incoming operation for this reject;
1365        could be that the other peer already disconnected... */
1366     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1367                 "Client rejected unknown operation %u\n",
1368                 (unsigned int) ntohl (msg->accept_reject_id));
1369     GNUNET_SERVICE_client_continue (cs->client);
1370     return;
1371   }
1372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373               "Peer request (op %u, app %s) rejected by client\n",
1374               op->listener->operation,
1375               GNUNET_h2s (&cs->listener->app_id));
1376   GNUNET_CADET_channel_destroy (op->channel);
1377   GNUNET_SERVICE_client_continue (cs->client);
1378 }
1379
1380
1381 /**
1382  * Called when a client wants to add or remove an element to a set it inhabits.
1383  *
1384  * @param cls client that sent the message
1385  * @param msg message sent by the client
1386  */
1387 static int
1388 check_client_mutation (void *cls,
1389                        const struct GNUNET_SET_ElementMessage *msg)
1390 {
1391   /* NOTE: Technically, we should probably check with the
1392      block library whether the element we are given is well-formed */
1393   return GNUNET_OK;
1394 }
1395
1396
1397 /**
1398  * Called when a client wants to add or remove an element to a set it inhabits.
1399  *
1400  * @param cls client that sent the message
1401  * @param msg message sent by the client
1402  */
1403 static void
1404 handle_client_mutation (void *cls,
1405                         const struct GNUNET_SET_ElementMessage *msg)
1406 {
1407   struct ClientState *cs = cls;
1408   struct Set *set;
1409
1410   if (NULL == (set = cs->set))
1411   {
1412     /* client without a set requested an operation */
1413     GNUNET_break (0);
1414     GNUNET_SERVICE_client_drop (cs->client);
1415     return;
1416   }
1417   GNUNET_SERVICE_client_continue (cs->client);
1418
1419   if (0 != set->content->iterator_count)
1420   {
1421     struct PendingMutation *pm;
1422
1423     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1424                 "Scheduling mutation on set\n");
1425     pm = GNUNET_new (struct PendingMutation);
1426     pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1427     pm->set = set;
1428     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1429                                       set->content->pending_mutations_tail,
1430                                       pm);
1431     return;
1432   }
1433   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1434               "Executing mutation on set\n");
1435   execute_mutation (set,
1436                     msg);
1437 }
1438
1439
1440 /**
1441  * Advance the current generation of a set,
1442  * adding exclusion ranges if necessary.
1443  *
1444  * @param set the set where we want to advance the generation
1445  */
1446 static void
1447 advance_generation (struct Set *set)
1448 {
1449   struct GenerationRange r;
1450
1451   if (set->current_generation == set->content->latest_generation)
1452   {
1453     set->content->latest_generation++;
1454     set->current_generation++;
1455     return;
1456   }
1457
1458   GNUNET_assert (set->current_generation < set->content->latest_generation);
1459
1460   r.start = set->current_generation + 1;
1461   r.end = set->content->latest_generation + 1;
1462   set->content->latest_generation = r.end;
1463   set->current_generation = r.end;
1464   GNUNET_array_append (set->excluded_generations,
1465                        set->excluded_generations_size,
1466                        r);
1467 }
1468
1469
1470 /**
1471  * Called when a client wants to initiate a set operation with another
1472  * peer.  Initiates the CADET connection to the listener and sends the
1473  * request.
1474  *
1475  * @param cls client that sent the message
1476  * @param msg message sent by the client
1477  * @return #GNUNET_OK if the message is well-formed
1478  */
1479 static int
1480 check_client_evaluate (void *cls,
1481                         const struct GNUNET_SET_EvaluateMessage *msg)
1482 {
1483   /* FIXME: suboptimal, even if the context below could be NULL,
1484      there are malformed messages this does not check for... */
1485   return GNUNET_OK;
1486 }
1487
1488
1489 /**
1490  * Called when a client wants to initiate a set operation with another
1491  * peer.  Initiates the CADET connection to the listener and sends the
1492  * request.
1493  *
1494  * @param cls client that sent the message
1495  * @param msg message sent by the client
1496  */
1497 static void
1498 handle_client_evaluate (void *cls,
1499                         const struct GNUNET_SET_EvaluateMessage *msg)
1500 {
1501   struct ClientState *cs = cls;
1502   struct Operation *op = GNUNET_new (struct Operation);
1503   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1504     GNUNET_MQ_hd_var_size (incoming_msg,
1505                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1506                            struct OperationRequestMessage,
1507                            op),
1508     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1509                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1510                            struct IBFMessage,
1511                            op),
1512     GNUNET_MQ_hd_var_size (union_p2p_elements,
1513                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1514                            struct GNUNET_SET_ElementMessage,
1515                            op),
1516     GNUNET_MQ_hd_var_size (union_p2p_offer,
1517                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1518                            struct GNUNET_MessageHeader,
1519                            op),
1520     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1521                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1522                            struct InquiryMessage,
1523                            op),
1524     GNUNET_MQ_hd_var_size (union_p2p_demand,
1525                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1526                            struct GNUNET_MessageHeader,
1527                            op),
1528     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1529                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1530                              struct GNUNET_MessageHeader,
1531                              op),
1532     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1533                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1534                              struct GNUNET_MessageHeader,
1535                              op),
1536     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1537                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1538                              struct GNUNET_MessageHeader,
1539                              op),
1540     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1541                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1542                              struct GNUNET_MessageHeader,
1543                              op),
1544     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1545                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1546                            struct StrataEstimatorMessage,
1547                            op),
1548     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1549                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1550                            struct StrataEstimatorMessage,
1551                            op),
1552     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1553                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1554                            struct GNUNET_SET_ElementMessage,
1555                            op),
1556     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1557                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1558                              struct IntersectionElementInfoMessage,
1559                              op),
1560     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1561                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1562                            struct BFMessage,
1563                            op),
1564     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1565                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1566                              struct IntersectionDoneMessage,
1567                              op),
1568     GNUNET_MQ_handler_end ()
1569   };
1570   struct Set *set;
1571   const struct GNUNET_MessageHeader *context;
1572
1573   if (NULL == (set = cs->set))
1574   {
1575     GNUNET_break (0);
1576     GNUNET_free (op);
1577     GNUNET_SERVICE_client_drop (cs->client);
1578     return;
1579   }
1580   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1581                                        UINT32_MAX);
1582   op->peer = msg->target_peer;
1583   op->result_mode = ntohl (msg->result_mode);
1584   op->client_request_id = ntohl (msg->request_id);
1585   op->byzantine = msg->byzantine;
1586   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1587   op->force_full = msg->force_full;
1588   op->force_delta = msg->force_delta;
1589   context = GNUNET_MQ_extract_nested_mh (msg);
1590
1591   /* Advance generation values, so that
1592      mutations won't interfer with the running operation. */
1593   op->set = set;
1594   op->generation_created = set->current_generation;
1595   advance_generation (set);
1596   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1597                                set->ops_tail,
1598                                op);
1599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1600               "Creating new CADET channel to port %s for set operation type %u\n",
1601               GNUNET_h2s (&msg->app_id),
1602               set->operation);
1603   op->channel = GNUNET_CADET_channel_create (cadet,
1604                                              op,
1605                                              &msg->target_peer,
1606                                              &msg->app_id,
1607                                              GNUNET_CADET_OPTION_RELIABLE,
1608                                              &channel_window_cb,
1609                                              &channel_end_cb,
1610                                              cadet_handlers);
1611   op->mq = GNUNET_CADET_get_mq (op->channel);
1612   op->state = set->vt->evaluate (op,
1613                                  context);
1614   if (NULL == op->state)
1615   {
1616     GNUNET_break (0);
1617     GNUNET_SERVICE_client_drop (cs->client);
1618     return;
1619   }
1620   GNUNET_SERVICE_client_continue (cs->client);
1621 }
1622
1623
1624 /**
1625  * Handle an ack from a client, and send the next element. Note
1626  * that we only expect acks for set elements, not after the
1627  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1628  *
1629  * @param cls client the client
1630  * @param ack the message
1631  */
1632 static void
1633 handle_client_iter_ack (void *cls,
1634                         const struct GNUNET_SET_IterAckMessage *ack)
1635 {
1636   struct ClientState *cs = cls;
1637   struct Set *set;
1638
1639   if (NULL == (set = cs->set))
1640   {
1641     /* client without a set acknowledged receiving a value */
1642     GNUNET_break (0);
1643     GNUNET_SERVICE_client_drop (cs->client);
1644     return;
1645   }
1646   if (NULL == set->iter)
1647   {
1648     /* client sent an ack, but we were not expecting one (as
1649        set iteration has finished) */
1650     GNUNET_break (0);
1651     GNUNET_SERVICE_client_drop (cs->client);
1652     return;
1653   }
1654   GNUNET_SERVICE_client_continue (cs->client);
1655   if (ntohl (ack->send_more))
1656   {
1657     send_client_element (set);
1658   }
1659   else
1660   {
1661     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1662     set->iter = NULL;
1663     set->iteration_id++;
1664   }
1665 }
1666
1667
1668 /**
1669  * Handle a request from the client to copy a set.
1670  *
1671  * @param cls the client
1672  * @param mh the message
1673  */
1674 static void
1675 handle_client_copy_lazy_prepare (void *cls,
1676                                  const struct GNUNET_MessageHeader *mh)
1677 {
1678   struct ClientState *cs = cls;
1679   struct Set *set;
1680   struct LazyCopyRequest *cr;
1681   struct GNUNET_MQ_Envelope *ev;
1682   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1683
1684   if (NULL == (set = cs->set))
1685   {
1686     /* client without a set requested an operation */
1687     GNUNET_break (0);
1688     GNUNET_SERVICE_client_drop (cs->client);
1689     return;
1690   }
1691   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1692               "Client requested creation of lazy copy\n");
1693   cr = GNUNET_new (struct LazyCopyRequest);
1694   cr->cookie = ++lazy_copy_cookie;
1695   cr->source_set = set;
1696   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1697                                lazy_copy_tail,
1698                                cr);
1699   ev = GNUNET_MQ_msg (resp_msg,
1700                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1701   resp_msg->cookie = cr->cookie;
1702   GNUNET_MQ_send (set->cs->mq,
1703                   ev);
1704   GNUNET_SERVICE_client_continue (cs->client);
1705 }
1706
1707
1708 /**
1709  * Handle a request from the client to connect to a copy of a set.
1710  *
1711  * @param cls the client
1712  * @param msg the message
1713  */
1714 static void
1715 handle_client_copy_lazy_connect (void *cls,
1716                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1717 {
1718   struct ClientState *cs = cls;
1719   struct LazyCopyRequest *cr;
1720   struct Set *set;
1721   int found;
1722
1723   if (NULL != cs->set)
1724   {
1725     /* There can only be one set per client */
1726     GNUNET_break (0);
1727     GNUNET_SERVICE_client_drop (cs->client);
1728     return;
1729   }
1730   found = GNUNET_NO;
1731   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1732   {
1733     if (cr->cookie == msg->cookie)
1734     {
1735       found = GNUNET_YES;
1736       break;
1737     }
1738   }
1739   if (GNUNET_NO == found)
1740   {
1741     /* client asked for copy with cookie we don't know */
1742     GNUNET_break (0);
1743     GNUNET_SERVICE_client_drop (cs->client);
1744     return;
1745   }
1746   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1747                                lazy_copy_tail,
1748                                cr);
1749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1750               "Client %p requested use of lazy copy\n",
1751               cs);
1752   set = GNUNET_new (struct Set);
1753   switch (cr->source_set->operation)
1754   {
1755   case GNUNET_SET_OPERATION_INTERSECTION:
1756     set->vt = _GSS_intersection_vt ();
1757     break;
1758   case GNUNET_SET_OPERATION_UNION:
1759     set->vt = _GSS_union_vt ();
1760     break;
1761   default:
1762     GNUNET_assert (0);
1763     return;
1764   }
1765
1766   if (NULL == set->vt->copy_state)
1767   {
1768     /* Lazy copy not supported for this set operation */
1769     GNUNET_break (0);
1770     GNUNET_free (set);
1771     GNUNET_free (cr);
1772     GNUNET_SERVICE_client_drop (cs->client);
1773     return;
1774   }
1775
1776   set->operation = cr->source_set->operation;
1777   set->state = set->vt->copy_state (cr->source_set->state);
1778   set->content = cr->source_set->content;
1779   set->content->refcount++;
1780
1781   set->current_generation = cr->source_set->current_generation;
1782   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1783   set->excluded_generations
1784     = GNUNET_memdup (cr->source_set->excluded_generations,
1785                      set->excluded_generations_size * sizeof (struct GenerationRange));
1786
1787   /* Advance the generation of the new set, so that mutations to the
1788      of the cloned set and the source set are independent. */
1789   advance_generation (set);
1790   set->cs = cs;
1791   cs->set = set;
1792   GNUNET_free (cr);
1793   GNUNET_SERVICE_client_continue (cs->client);
1794 }
1795
1796
1797 /**
1798  * Handle a request from the client to cancel a running set operation.
1799  *
1800  * @param cls the client
1801  * @param msg the message
1802  */
1803 static void
1804 handle_client_cancel (void *cls,
1805                       const struct GNUNET_SET_CancelMessage *msg)
1806 {
1807   struct ClientState *cs = cls;
1808   struct Set *set;
1809   struct Operation *op;
1810   int found;
1811
1812   if (NULL == (set = cs->set))
1813   {
1814     /* client without a set requested an operation */
1815     GNUNET_break (0);
1816     GNUNET_SERVICE_client_drop (cs->client);
1817     return;
1818   }
1819   found = GNUNET_NO;
1820   for (op = set->ops_head; NULL != op; op = op->next)
1821   {
1822     if (op->client_request_id == ntohl (msg->request_id))
1823     {
1824       found = GNUNET_YES;
1825       break;
1826     }
1827   }
1828   if (GNUNET_NO == found)
1829   {
1830     /* It may happen that the operation was already destroyed due to
1831      * the other peer disconnecting.  The client may not know about this
1832      * yet and try to cancel the (just barely non-existent) operation.
1833      * So this is not a hard error.
1834      */
1835     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1836                 "Client canceled non-existent op %u\n",
1837                 (uint32_t) ntohl (msg->request_id));
1838   }
1839   else
1840   {
1841     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1842                 "Client requested cancel for op %u\n",
1843                 (uint32_t) ntohl (msg->request_id));
1844     _GSS_operation_destroy (op,
1845                             GNUNET_YES);
1846   }
1847   GNUNET_SERVICE_client_continue (cs->client);
1848 }
1849
1850
1851 /**
1852  * Handle a request from the client to accept a set operation that
1853  * came from a remote peer.  We forward the accept to the associated
1854  * operation for handling
1855  *
1856  * @param cls the client
1857  * @param msg the message
1858  */
1859 static void
1860 handle_client_accept (void *cls,
1861                       const struct GNUNET_SET_AcceptMessage *msg)
1862 {
1863   struct ClientState *cs = cls;
1864   struct Set *set;
1865   struct Operation *op;
1866   struct GNUNET_SET_ResultMessage *result_message;
1867   struct GNUNET_MQ_Envelope *ev;
1868   struct Listener *listener;
1869
1870   if (NULL == (set = cs->set))
1871   {
1872     /* client without a set requested to accept */
1873     GNUNET_break (0);
1874     GNUNET_SERVICE_client_drop (cs->client);
1875     return;
1876   }
1877   op = get_incoming (ntohl (msg->accept_reject_id));
1878   if (NULL == op)
1879   {
1880     /* It is not an error if the set op does not exist -- it may
1881      * have been destroyed when the partner peer disconnected. */
1882     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1883                 "Client %p accepted request %u of listener %p that is no longer active\n",
1884                 cs,
1885                 ntohl (msg->accept_reject_id),
1886                 cs->listener);
1887     ev = GNUNET_MQ_msg (result_message,
1888                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1889     result_message->request_id = msg->request_id;
1890     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1891     GNUNET_MQ_send (set->cs->mq,
1892                     ev);
1893     GNUNET_SERVICE_client_continue (cs->client);
1894     return;
1895   }
1896   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1897               "Client accepting request %u\n",
1898               (uint32_t) ntohl (msg->accept_reject_id));
1899   listener = op->listener;
1900   op->listener = NULL;
1901   GNUNET_CONTAINER_DLL_remove (listener->op_head,
1902                                listener->op_tail,
1903                                op);
1904   op->set = set;
1905   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1906                                set->ops_tail,
1907                                op);
1908   op->client_request_id = ntohl (msg->request_id);
1909   op->result_mode = ntohl (msg->result_mode);
1910   op->byzantine = msg->byzantine;
1911   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1912   op->force_full = msg->force_full;
1913   op->force_delta = msg->force_delta;
1914
1915   /* Advance generation values, so that future mutations do not
1916      interfer with the running operation. */
1917   op->generation_created = set->current_generation;
1918   advance_generation (set);
1919   GNUNET_assert (NULL == op->state);
1920   op->state = set->vt->accept (op);
1921   if (NULL == op->state)
1922   {
1923     GNUNET_break (0);
1924     GNUNET_SERVICE_client_drop (cs->client);
1925     return;
1926   }
1927   /* Now allow CADET to continue, as we did not do this in
1928      #handle_incoming_msg (as we wanted to first see if the
1929      local client would accept the request). */
1930   GNUNET_CADET_receive_done (op->channel);
1931   GNUNET_SERVICE_client_continue (cs->client);
1932 }
1933
1934
1935 /**
1936  * Called to clean up, after a shutdown has been requested.
1937  *
1938  * @param cls closure, NULL
1939  */
1940 static void
1941 shutdown_task (void *cls)
1942 {
1943   /* Delay actual shutdown to allow service to disconnect clients */
1944   in_shutdown = GNUNET_YES;
1945   if (0 == num_clients)
1946   {
1947     if (NULL != cadet)
1948     {
1949       GNUNET_CADET_disconnect (cadet);
1950       cadet = NULL;
1951     }
1952   }
1953   GNUNET_STATISTICS_destroy (_GSS_statistics,
1954                              GNUNET_YES);
1955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956               "handled shutdown request\n");
1957 }
1958
1959
1960 /**
1961  * Function called by the service's run
1962  * method to run service-specific setup code.
1963  *
1964  * @param cls closure
1965  * @param cfg configuration to use
1966  * @param service the initialized service
1967  */
1968 static void
1969 run (void *cls,
1970      const struct GNUNET_CONFIGURATION_Handle *cfg,
1971      struct GNUNET_SERVICE_Handle *service)
1972 {
1973   /* FIXME: need to modify SERVICE (!) API to allow
1974      us to run a shutdown task *after* clients were
1975      forcefully disconnected! */
1976   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1977                                  NULL);
1978   _GSS_statistics = GNUNET_STATISTICS_create ("set",
1979                                               cfg);
1980   cadet = GNUNET_CADET_connect (cfg);
1981   if (NULL == cadet)
1982   {
1983     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1984                 _("Could not connect to CADET service\n"));
1985     GNUNET_SCHEDULER_shutdown ();
1986     return;
1987   }
1988 }
1989
1990
1991 /**
1992  * Define "main" method using service macro.
1993  */
1994 GNUNET_SERVICE_MAIN
1995 ("set",
1996  GNUNET_SERVICE_OPTION_NONE,
1997  &run,
1998  &client_connect_cb,
1999  &client_disconnect_cb,
2000  NULL,
2001  GNUNET_MQ_hd_fixed_size (client_accept,
2002                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2003                           struct GNUNET_SET_AcceptMessage,
2004                           NULL),
2005  GNUNET_MQ_hd_fixed_size (client_iter_ack,
2006                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2007                           struct GNUNET_SET_IterAckMessage,
2008                           NULL),
2009  GNUNET_MQ_hd_var_size (client_mutation,
2010                         GNUNET_MESSAGE_TYPE_SET_ADD,
2011                         struct GNUNET_SET_ElementMessage,
2012                         NULL),
2013  GNUNET_MQ_hd_fixed_size (client_create_set,
2014                           GNUNET_MESSAGE_TYPE_SET_CREATE,
2015                           struct GNUNET_SET_CreateMessage,
2016                           NULL),
2017  GNUNET_MQ_hd_fixed_size (client_iterate,
2018                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2019                           struct GNUNET_MessageHeader,
2020                           NULL),
2021  GNUNET_MQ_hd_var_size (client_evaluate,
2022                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2023                         struct GNUNET_SET_EvaluateMessage,
2024                         NULL),
2025  GNUNET_MQ_hd_fixed_size (client_listen,
2026                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
2027                           struct GNUNET_SET_ListenMessage,
2028                           NULL),
2029  GNUNET_MQ_hd_fixed_size (client_reject,
2030                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2031                           struct GNUNET_SET_RejectMessage,
2032                           NULL),
2033  GNUNET_MQ_hd_var_size (client_mutation,
2034                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2035                         struct GNUNET_SET_ElementMessage,
2036                         NULL),
2037  GNUNET_MQ_hd_fixed_size (client_cancel,
2038                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2039                           struct GNUNET_SET_CancelMessage,
2040                           NULL),
2041  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2042                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2043                           struct GNUNET_MessageHeader,
2044                           NULL),
2045  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2046                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2047                           struct GNUNET_SET_CopyLazyConnectMessage,
2048                           NULL),
2049  GNUNET_MQ_handler_end ());
2050
2051
2052 /* end of gnunet-service-set.c */