75122395dfc3d5af0ab9aaec2ff9735373e2d5ba
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14      
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file set/gnunet-service-set.c
20  * @brief two-peer set operations
21  * @author Florian Dold
22  * @author Christian Grothoff
23  */
24 #include "gnunet-service-set.h"
25 #include "gnunet-service-set_union.h"
26 #include "gnunet-service-set_intersection.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
29
30 /**
31  * How long do we hold on to an incoming channel if there is
32  * no local listener before giving up?
33  */
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35
36
37 /**
38  * Lazy copy requests made by a client.
39  */
40 struct LazyCopyRequest
41 {
42   /**
43    * Kept in a DLL.
44    */
45   struct LazyCopyRequest *prev;
46
47   /**
48    * Kept in a DLL.
49    */
50   struct LazyCopyRequest *next;
51
52   /**
53    * Which set are we supposed to copy?
54    */
55   struct Set *source_set;
56
57   /**
58    * Cookie identifying the request.
59    */
60   uint32_t cookie;
61
62 };
63
64
65 /**
66  * A listener is inhabited by a client, and waits for evaluation
67  * requests from remote peers.
68  */
69 struct Listener
70 {
71   /**
72    * Listeners are held in a doubly linked list.
73    */
74   struct Listener *next;
75
76   /**
77    * Listeners are held in a doubly linked list.
78    */
79   struct Listener *prev;
80
81   /**
82    * Head of DLL of operations this listener is responsible for.
83    * Once the client has accepted/declined the operation, the
84    * operation is moved to the respective set's operation DLLS.
85    */
86   struct Operation *op_head;
87
88   /**
89    * Tail of DLL of operations this listener is responsible for.
90    * Once the client has accepted/declined the operation, the
91    * operation is moved to the respective set's operation DLLS.
92    */
93   struct Operation *op_tail;
94
95   /**
96    * Client that owns the listener.
97    * Only one client may own a listener.
98    */
99   struct ClientState *cs;
100
101   /**
102    * The port we are listening on with CADET.
103    */
104   struct GNUNET_CADET_Port *open_port;
105
106   /**
107    * Application ID for the operation, used to distinguish
108    * multiple operations of the same type with the same peer.
109    */
110   struct GNUNET_HashCode app_id;
111
112   /**
113    * The type of the operation.
114    */
115   enum GNUNET_SET_OperationType operation;
116 };
117
118
119 /**
120  * Handle to the cadet service, used to listen for and connect to
121  * remote peers.
122  */
123 static struct GNUNET_CADET_Handle *cadet;
124
125 /**
126  * DLL of lazy copy requests by this client.
127  */
128 static struct LazyCopyRequest *lazy_copy_head;
129
130 /**
131  * DLL of lazy copy requests by this client.
132  */
133 static struct LazyCopyRequest *lazy_copy_tail;
134
135 /**
136  * Generator for unique cookie we set per lazy copy request.
137  */
138 static uint32_t lazy_copy_cookie;
139
140 /**
141  * Statistics handle.
142  */
143 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
144
145 /**
146  * Listeners are held in a doubly linked list.
147  */
148 static struct Listener *listener_head;
149
150 /**
151  * Listeners are held in a doubly linked list.
152  */
153 static struct Listener *listener_tail;
154
155 /**
156  * Number of active clients.
157  */
158 static unsigned int num_clients;
159
160 /**
161  * Are we in shutdown? if #GNUNET_YES and the number of clients
162  * drops to zero, disconnect from CADET.
163  */
164 static int in_shutdown;
165
166 /**
167  * Counter for allocating unique IDs for clients, used to identify
168  * incoming operation requests from remote peers, that the client can
169  * choose to accept or refuse.  0 must not be used (reserved for
170  * uninitialized).
171  */
172 static uint32_t suggest_id;
173
174
175 /**
176  * Get the incoming socket associated with the given id.
177  *
178  * @param listener the listener to look in
179  * @param id id to look for
180  * @return the incoming socket associated with the id,
181  *         or NULL if there is none
182  */
183 static struct Operation *
184 get_incoming (uint32_t id)
185 {
186   for (struct Listener *listener = listener_head;
187        NULL != listener;
188        listener = listener->next)
189   {
190     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191       if (op->suggest_id == id)
192         return op;
193   }
194   return NULL;
195 }
196
197
198 /**
199  * Destroy an incoming request from a remote peer
200  *
201  * @param op remote request to destroy
202  */
203 static void
204 incoming_destroy (struct Operation *op)
205 {
206   struct Listener *listener;
207   struct GNUNET_CADET_Channel *channel;
208
209   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210               "Destroying incoming operation %p\n",
211               op);
212   if (NULL != (listener = op->listener))
213   {
214     GNUNET_CONTAINER_DLL_remove (listener->op_head,
215                                  listener->op_tail,
216                                  op);
217     op->listener = NULL;
218   }
219   if (NULL != op->timeout_task)
220   {
221     GNUNET_SCHEDULER_cancel (op->timeout_task);
222     op->timeout_task = NULL;
223   }
224   _GSS_operation_destroy2 (op);
225 }
226
227
228 /**
229  * Context for the #garbage_collect_cb().
230  */
231 struct GarbageContext
232 {
233
234   /**
235    * Map for which we are garbage collecting removed elements.
236    */
237   struct GNUNET_CONTAINER_MultiHashMap *map;
238
239   /**
240    * Lowest generation for which an operation is still pending.
241    */
242   unsigned int min_op_generation;
243
244   /**
245    * Largest generation for which an operation is still pending.
246    */
247   unsigned int max_op_generation;
248
249 };
250
251
252 /**
253  * Function invoked to check if an element can be removed from
254  * the set's history because it is no longer needed.
255  *
256  * @param cls the `struct GarbageContext *`
257  * @param key key of the element in the map
258  * @param value the `struct ElementEntry *`
259  * @return #GNUNET_OK (continue to iterate)
260  */
261 static int
262 garbage_collect_cb (void *cls,
263                     const struct GNUNET_HashCode *key,
264                     void *value)
265 {
266   //struct GarbageContext *gc = cls;
267   //struct ElementEntry *ee = value;
268
269   //if (GNUNET_YES != ee->removed)
270   //  return GNUNET_OK;
271   //if ( (gc->max_op_generation < ee->generation_added) ||
272   //     (ee->generation_removed > gc->min_op_generation) )
273   //{
274   //  GNUNET_assert (GNUNET_YES ==
275   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
276   //                                                       key,
277   //                                                       ee));
278   //  GNUNET_free (ee);
279   //}
280   return GNUNET_OK;
281 }
282
283
284 /**
285  * Collect and destroy elements that are not needed anymore, because
286  * their lifetime (as determined by their generation) does not overlap
287  * with any active set operation.
288  *
289  * @param set set to garbage collect
290  */
291 static void
292 collect_generation_garbage (struct Set *set)
293 {
294   struct GarbageContext gc;
295
296   gc.min_op_generation = UINT_MAX;
297   gc.max_op_generation = 0;
298   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
299   {
300     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
301                                        op->generation_created);
302     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
303                                        op->generation_created);
304   }
305   gc.map = set->content->elements;
306   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
307                                          &garbage_collect_cb,
308                                          &gc);
309 }
310
311
312 /**
313  * Is @a generation in the range of exclusions?
314  *
315  * @param generation generation to query
316  * @param excluded array of generations where the element is excluded
317  * @param excluded_size length of the @a excluded array
318  * @return #GNUNET_YES if @a generation is in any of the ranges
319  */
320 static int
321 is_excluded_generation (unsigned int generation,
322                         struct GenerationRange *excluded,
323                         unsigned int excluded_size)
324 {
325   for (unsigned int i = 0; i < excluded_size; i++)
326     if ( (generation >= excluded[i].start) &&
327          (generation < excluded[i].end) )
328       return GNUNET_YES;
329   return GNUNET_NO;
330 }
331
332
333 /**
334  * Is element @a ee part of the set during @a query_generation?
335  *
336  * @param ee element to test
337  * @param query_generation generation to query
338  * @param excluded array of generations where the element is excluded
339  * @param excluded_size length of the @a excluded array
340  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
341  */
342 static int
343 is_element_of_generation (struct ElementEntry *ee,
344                           unsigned int query_generation,
345                           struct GenerationRange *excluded,
346                           unsigned int excluded_size)
347 {
348   struct MutationEvent *mut;
349   int is_present;
350
351   GNUNET_assert (NULL != ee->mutations);
352   if (GNUNET_YES ==
353       is_excluded_generation (query_generation,
354                               excluded,
355                               excluded_size))
356   {
357     GNUNET_break (0);
358     return GNUNET_NO;
359   }
360
361   is_present = GNUNET_NO;
362
363   /* Could be made faster with binary search, but lists
364      are small, so why bother. */
365   for (unsigned int i = 0; i < ee->mutations_size; i++)
366   {
367     mut = &ee->mutations[i];
368
369     if (mut->generation > query_generation)
370     {
371       /* The mutation doesn't apply to our generation
372          anymore.  We can'b break here, since mutations aren't
373          sorted by generation. */
374       continue;
375     }
376
377     if (GNUNET_YES ==
378         is_excluded_generation (mut->generation,
379                                 excluded,
380                                 excluded_size))
381     {
382       /* The generation is excluded (because it belongs to another
383          fork via a lazy copy) and thus mutations aren't considered
384          for membership testing. */
385       continue;
386     }
387
388     /* This would be an inconsistency in how we manage mutations. */
389     if ( (GNUNET_YES == is_present) &&
390          (GNUNET_YES == mut->added) )
391       GNUNET_assert (0);
392     /* Likewise. */
393     if ( (GNUNET_NO == is_present) &&
394          (GNUNET_NO == mut->added) )
395       GNUNET_assert (0);
396
397     is_present = mut->added;
398   }
399
400   return is_present;
401 }
402
403
404 /**
405  * Is element @a ee part of the set used by @a op?
406  *
407  * @param ee element to test
408  * @param op operation the defines the set and its generation
409  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
410  */
411 int
412 _GSS_is_element_of_operation (struct ElementEntry *ee,
413                               struct Operation *op)
414 {
415   return is_element_of_generation (ee,
416                                    op->generation_created,
417                                    op->set->excluded_generations,
418                                    op->set->excluded_generations_size);
419 }
420
421
422 /**
423  * Destroy the given operation.  Used for any operation where both
424  * peers were known and that thus actually had a vt and channel.  Must
425  * not be used for operations where 'listener' is still set and we do
426  * not know the other peer.
427  *
428  * Call the implementation-specific cancel function of the operation.
429  * Disconnects from the remote peer.  Does not disconnect the client,
430  * as there may be multiple operations per set.
431  *
432  * @param op operation to destroy
433  * @param gc #GNUNET_YES to perform garbage collection on the set
434  */
435 void
436 _GSS_operation_destroy (struct Operation *op,
437                         int gc)
438 {
439   struct Set *set = op->set;
440   struct GNUNET_CADET_Channel *channel;
441
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443               "Destroying operation %p\n",
444               op);
445   GNUNET_assert (NULL == op->listener);
446   if (NULL != op->state)
447   {
448     set->vt->cancel (op);
449     op->state = NULL;
450   }
451   if (NULL != set)
452   {
453     GNUNET_CONTAINER_DLL_remove (set->ops_head,
454                                  set->ops_tail,
455                                  op);
456     op->set = NULL;
457   }
458   if (NULL != op->context_msg)
459   {
460     GNUNET_free (op->context_msg);
461     op->context_msg = NULL;
462   }
463   if (NULL != (channel = op->channel))
464   {
465     /* This will free op; called conditionally as this helper function
466        is also called from within the channel disconnect handler. */
467     op->channel = NULL;
468     GNUNET_CADET_channel_destroy (channel);
469   }
470   if ( (NULL != set) &&
471        (GNUNET_YES == gc) )
472     collect_generation_garbage (set);
473   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
474    * there was a channel end handler that will free 'op' on the call stack. */
475 }
476
477
478 /**
479  * Callback called when a client connects to the service.
480  *
481  * @param cls closure for the service
482  * @param c the new client that connected to the service
483  * @param mq the message queue used to send messages to the client
484  * @return @a `struct ClientState`
485  */
486 static void *
487 client_connect_cb (void *cls,
488                    struct GNUNET_SERVICE_Client *c,
489                    struct GNUNET_MQ_Handle *mq)
490 {
491   struct ClientState *cs;
492
493   num_clients++;
494   cs = GNUNET_new (struct ClientState);
495   cs->client = c;
496   cs->mq = mq;
497   return cs;
498 }
499
500
501 /**
502  * Iterator over hash map entries to free element entries.
503  *
504  * @param cls closure
505  * @param key current key code
506  * @param value a `struct ElementEntry *` to be free'd
507  * @return #GNUNET_YES (continue to iterate)
508  */
509 static int
510 destroy_elements_iterator (void *cls,
511                            const struct GNUNET_HashCode *key,
512                            void *value)
513 {
514   struct ElementEntry *ee = value;
515
516   GNUNET_free_non_null (ee->mutations);
517   GNUNET_free (ee);
518   return GNUNET_YES;
519 }
520
521
522 /**
523  * Clean up after a client has disconnected
524  *
525  * @param cls closure, unused
526  * @param client the client to clean up after
527  * @param internal_cls the `struct ClientState`
528  */
529 static void
530 client_disconnect_cb (void *cls,
531                       struct GNUNET_SERVICE_Client *client,
532                       void *internal_cls)
533 {
534   struct ClientState *cs = internal_cls;
535   struct Operation *op;
536   struct Listener *listener;
537   struct Set *set;
538
539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540               "Client disconnected, cleaning up\n");
541   if (NULL != (set = cs->set))
542   {
543     struct SetContent *content = set->content;
544     struct PendingMutation *pm;
545     struct PendingMutation *pm_current;
546     struct LazyCopyRequest *lcr;
547
548     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
549                 "Destroying client's set\n");
550     /* Destroy pending set operations */
551     while (NULL != set->ops_head)
552       _GSS_operation_destroy (set->ops_head,
553                               GNUNET_NO);
554
555     /* Destroy operation-specific state */
556     GNUNET_assert (NULL != set->state);
557     set->vt->destroy_set (set->state);
558     set->state = NULL;
559
560     /* Clean up ongoing iterations */
561     if (NULL != set->iter)
562     {
563       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
564       set->iter = NULL;
565       set->iteration_id++;
566     }
567
568     /* discard any pending mutations that reference this set */
569     pm = content->pending_mutations_head;
570     while (NULL != pm)
571     {
572       pm_current = pm;
573       pm = pm->next;
574       if (pm_current->set == set)
575       {
576         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
577                                      content->pending_mutations_tail,
578                                      pm_current);
579         GNUNET_free (pm_current);
580       }
581     }
582
583     /* free set content (or at least decrement RC) */
584     set->content = NULL;
585     GNUNET_assert (0 != content->refcount);
586     content->refcount--;
587     if (0 == content->refcount)
588     {
589       GNUNET_assert (NULL != content->elements);
590       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
591                                              &destroy_elements_iterator,
592                                              NULL);
593       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
594       content->elements = NULL;
595       GNUNET_free (content);
596     }
597     GNUNET_free_non_null (set->excluded_generations);
598     set->excluded_generations = NULL;
599
600     /* remove set from pending copy requests */
601     lcr = lazy_copy_head;
602     while (NULL != lcr)
603     {
604       struct LazyCopyRequest *lcr_current = lcr;
605
606       lcr = lcr->next;
607       if (lcr_current->source_set == set)
608       {
609         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
610                                      lazy_copy_tail,
611                                      lcr_current);
612         GNUNET_free (lcr_current);
613       }
614     }
615     GNUNET_free (set);
616   }
617
618   if (NULL != (listener = cs->listener))
619   {
620     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621                 "Destroying client's listener\n");
622     GNUNET_CADET_close_port (listener->open_port);
623     listener->open_port = NULL;
624     while (NULL != (op = listener->op_head))
625     {
626       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
627                   "Destroying incoming operation `%u' from peer `%s'\n",
628                   (unsigned int) op->client_request_id,
629                   GNUNET_i2s (&op->peer));
630       incoming_destroy (op);
631     }
632     GNUNET_CONTAINER_DLL_remove (listener_head,
633                                  listener_tail,
634                                  listener);
635     GNUNET_free (listener);
636   }
637   GNUNET_free (cs);
638   num_clients--;
639   if ( (GNUNET_YES == in_shutdown) &&
640        (0 == num_clients) )
641   {
642     if (NULL != cadet)
643     {
644       GNUNET_CADET_disconnect (cadet);
645       cadet = NULL;
646     }
647   }
648 }
649
650
651 /**
652  * Check a request for a set operation from another peer.
653  *
654  * @param cls the operation state
655  * @param msg the received message
656  * @return #GNUNET_OK if the channel should be kept alive,
657  *         #GNUNET_SYSERR to destroy the channel
658  */
659 static int
660 check_incoming_msg (void *cls,
661                     const struct OperationRequestMessage *msg)
662 {
663   struct Operation *op = cls;
664   struct Listener *listener = op->listener;
665   const struct GNUNET_MessageHeader *nested_context;
666
667   /* double operation request */
668   if (0 != op->suggest_id)
669   {
670     GNUNET_break_op (0);
671     return GNUNET_SYSERR;
672   }
673   /* This should be equivalent to the previous condition, but can't hurt to check twice */
674   if (NULL == op->listener)
675   {
676     GNUNET_break (0);
677     return GNUNET_SYSERR;
678   }
679   if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
680   {
681     GNUNET_break_op (0);
682     return GNUNET_SYSERR;
683   }
684   nested_context = GNUNET_MQ_extract_nested_mh (msg);
685   if ( (NULL != nested_context) &&
686        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
687   {
688     GNUNET_break_op (0);
689     return GNUNET_SYSERR;
690   }
691   return GNUNET_OK;
692 }
693
694
695 /**
696  * Handle a request for a set operation from another peer.  Checks if we
697  * have a listener waiting for such a request (and in that case initiates
698  * asking the listener about accepting the connection). If no listener
699  * is waiting, we queue the operation request in hope that a listener
700  * shows up soon (before timeout).
701  *
702  * This msg is expected as the first and only msg handled through the
703  * non-operation bound virtual table, acceptance of this operation replaces
704  * our virtual table and subsequent msgs would be routed differently (as
705  * we then know what type of operation this is).
706  *
707  * @param cls the operation state
708  * @param msg the received message
709  * @return #GNUNET_OK if the channel should be kept alive,
710  *         #GNUNET_SYSERR to destroy the channel
711  */
712 static void
713 handle_incoming_msg (void *cls,
714                      const struct OperationRequestMessage *msg)
715 {
716   struct Operation *op = cls;
717   struct Listener *listener = op->listener;
718   const struct GNUNET_MessageHeader *nested_context;
719   struct GNUNET_MQ_Envelope *env;
720   struct GNUNET_SET_RequestMessage *cmsg;
721
722   nested_context = GNUNET_MQ_extract_nested_mh (msg);
723   /* Make a copy of the nested_context (application-specific context
724      information that is opaque to set) so we can pass it to the
725      listener later on */
726   if (NULL != nested_context)
727     op->context_msg = GNUNET_copy_message (nested_context);
728   op->remote_element_count = ntohl (msg->element_count);
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "Received P2P operation request (op %u, port %s) for active listener\n",
731               (uint32_t) ntohl (msg->operation),
732               GNUNET_h2s (&op->listener->app_id));
733   GNUNET_assert (0 == op->suggest_id);
734   if (0 == suggest_id)
735     suggest_id++;
736   op->suggest_id = suggest_id++;
737   GNUNET_assert (NULL != op->timeout_task);
738   GNUNET_SCHEDULER_cancel (op->timeout_task);
739   op->timeout_task = NULL;
740   env = GNUNET_MQ_msg_nested_mh (cmsg,
741                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
742                                  op->context_msg);
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "Suggesting incoming request with accept id %u to listener %p of client %p\n",
745               op->suggest_id,
746               listener,
747               listener->cs);
748   cmsg->accept_id = htonl (op->suggest_id);
749   cmsg->peer_id = op->peer;
750   GNUNET_MQ_send (listener->cs->mq,
751                   env);
752   /* NOTE: GNUNET_CADET_receive_done() will be called in
753      #handle_client_accept() */
754 }
755
756
757 /**
758  * Add an element to @a set as specified by @a msg
759  *
760  * @param set set to manipulate
761  * @param msg message specifying the change
762  */
763 static void
764 execute_add (struct Set *set,
765              const struct GNUNET_SET_ElementMessage *msg)
766 {
767   struct GNUNET_SET_Element el;
768   struct ElementEntry *ee;
769   struct GNUNET_HashCode hash;
770
771   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
772   el.size = ntohs (msg->header.size) - sizeof (*msg);
773   el.data = &msg[1];
774   el.element_type = ntohs (msg->element_type);
775   GNUNET_SET_element_hash (&el,
776                            &hash);
777   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
778                                           &hash);
779   if (NULL == ee)
780   {
781     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782                 "Client inserts element %s of size %u\n",
783                 GNUNET_h2s (&hash),
784                 el.size);
785     ee = GNUNET_malloc (el.size + sizeof (*ee));
786     ee->element.size = el.size;
787     GNUNET_memcpy (&ee[1],
788             el.data,
789             el.size);
790     ee->element.data = &ee[1];
791     ee->element.element_type = el.element_type;
792     ee->remote = GNUNET_NO;
793     ee->mutations = NULL;
794     ee->mutations_size = 0;
795     ee->element_hash = hash;
796     GNUNET_break (GNUNET_YES ==
797                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
798                                                      &ee->element_hash,
799                                                      ee,
800                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
801   }
802   else if (GNUNET_YES ==
803            is_element_of_generation (ee,
804                                      set->current_generation,
805                                      set->excluded_generations,
806                                      set->excluded_generations_size))
807   {
808     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809                 "Client inserted element %s of size %u twice (ignored)\n",
810                 GNUNET_h2s (&hash),
811                 el.size);
812
813     /* same element inserted twice */
814     return;
815   }
816
817   {
818     struct MutationEvent mut = {
819       .generation = set->current_generation,
820       .added = GNUNET_YES
821     };
822     GNUNET_array_append (ee->mutations,
823                          ee->mutations_size,
824                          mut);
825   }
826   set->vt->add (set->state,
827                 ee);
828 }
829
830
831 /**
832  * Remove an element from @a set as specified by @a msg
833  *
834  * @param set set to manipulate
835  * @param msg message specifying the change
836  */
837 static void
838 execute_remove (struct Set *set,
839                 const struct GNUNET_SET_ElementMessage *msg)
840 {
841   struct GNUNET_SET_Element el;
842   struct ElementEntry *ee;
843   struct GNUNET_HashCode hash;
844
845   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
846   el.size = ntohs (msg->header.size) - sizeof (*msg);
847   el.data = &msg[1];
848   el.element_type = ntohs (msg->element_type);
849   GNUNET_SET_element_hash (&el, &hash);
850   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
851                                           &hash);
852   if (NULL == ee)
853   {
854     /* Client tried to remove non-existing element. */
855     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856                 "Client removes non-existing element of size %u\n",
857                 el.size);
858     return;
859   }
860   if (GNUNET_NO ==
861       is_element_of_generation (ee,
862                                 set->current_generation,
863                                 set->excluded_generations,
864                                 set->excluded_generations_size))
865   {
866     /* Client tried to remove element twice */
867     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                 "Client removed element of size %u twice (ignored)\n",
869                 el.size);
870     return;
871   }
872   else
873   {
874     struct MutationEvent mut = {
875       .generation = set->current_generation,
876       .added = GNUNET_NO
877     };
878
879     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880                 "Client removes element of size %u\n",
881                 el.size);
882
883     GNUNET_array_append (ee->mutations,
884                          ee->mutations_size,
885                          mut);
886   }
887   set->vt->remove (set->state,
888                    ee);
889 }
890
891
892 /**
893  * Perform a mutation on a set as specified by the @a msg
894  *
895  * @param set the set to mutate
896  * @param msg specification of what to change
897  */
898 static void
899 execute_mutation (struct Set *set,
900                   const struct GNUNET_SET_ElementMessage *msg)
901 {
902   switch (ntohs (msg->header.type))
903   {
904     case GNUNET_MESSAGE_TYPE_SET_ADD:
905       execute_add (set, msg);
906       break;
907     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
908       execute_remove (set, msg);
909       break;
910     default:
911       GNUNET_break (0);
912   }
913 }
914
915
916 /**
917  * Execute mutations that were delayed on a set because of
918  * pending operations.
919  *
920  * @param set the set to execute mutations on
921  */
922 static void
923 execute_delayed_mutations (struct Set *set)
924 {
925   struct PendingMutation *pm;
926
927   if (0 != set->content->iterator_count)
928     return; /* still cannot do this */
929   while (NULL != (pm = set->content->pending_mutations_head))
930   {
931     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
932                                  set->content->pending_mutations_tail,
933                                  pm);
934     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935                 "Executing pending mutation on %p.\n",
936                 pm->set);
937     execute_mutation (pm->set,
938                       pm->msg);
939     GNUNET_free (pm->msg);
940     GNUNET_free (pm);
941   }
942 }
943
944
945 /**
946  * Send the next element of a set to the set's client.  The next element is given by
947  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
948  * are no more elements in the set.  The caller must ensure that the set's iterator is
949  * valid.
950  *
951  * The client will acknowledge each received element with a
952  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
953  * #handle_client_iter_ack() will then trigger the next transmission.
954  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
955  *
956  * @param set set that should send its next element to its client
957  */
958 static void
959 send_client_element (struct Set *set)
960 {
961   int ret;
962   struct ElementEntry *ee;
963   struct GNUNET_MQ_Envelope *ev;
964   struct GNUNET_SET_IterResponseMessage *msg;
965
966   GNUNET_assert (NULL != set->iter);
967   do {
968     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
969                                                        NULL,
970                                                        (const void **) &ee);
971     if (GNUNET_NO == ret)
972     {
973       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974                   "Iteration on %p done.\n",
975                   set);
976       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
977       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
978       set->iter = NULL;
979       set->iteration_id++;
980       GNUNET_assert (set->content->iterator_count > 0);
981       set->content->iterator_count--;
982       execute_delayed_mutations (set);
983       GNUNET_MQ_send (set->cs->mq,
984                       ev);
985       return;
986     }
987     GNUNET_assert (NULL != ee);
988   } while (GNUNET_NO ==
989            is_element_of_generation (ee,
990                                      set->iter_generation,
991                                      set->excluded_generations,
992                                      set->excluded_generations_size));
993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994               "Sending iteration element on %p.\n",
995               set);
996   ev = GNUNET_MQ_msg_extra (msg,
997                             ee->element.size,
998                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
999   GNUNET_memcpy (&msg[1],
1000                  ee->element.data,
1001                  ee->element.size);
1002   msg->element_type = htons (ee->element.element_type);
1003   msg->iteration_id = htons (set->iteration_id);
1004   GNUNET_MQ_send (set->cs->mq,
1005                   ev);
1006 }
1007
1008
1009 /**
1010  * Called when a client wants to iterate the elements of a set.
1011  * Checks if we have a set associated with the client and if we
1012  * can right now start an iteration. If all checks out, starts
1013  * sending the elements of the set to the client.
1014  *
1015  * @param cls client that sent the message
1016  * @param m message sent by the client
1017  */
1018 static void
1019 handle_client_iterate (void *cls,
1020                        const struct GNUNET_MessageHeader *m)
1021 {
1022   struct ClientState *cs = cls;
1023   struct Set *set;
1024
1025   if (NULL == (set = cs->set))
1026   {
1027     /* attempt to iterate over a non existing set */
1028     GNUNET_break (0);
1029     GNUNET_SERVICE_client_drop (cs->client);
1030     return;
1031   }
1032   if (NULL != set->iter)
1033   {
1034     /* Only one concurrent iterate-action allowed per set */
1035     GNUNET_break (0);
1036     GNUNET_SERVICE_client_drop (cs->client);
1037     return;
1038   }
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040               "Iterating set %p in gen %u with %u content elements\n",
1041               (void *) set,
1042               set->current_generation,
1043               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1044   GNUNET_SERVICE_client_continue (cs->client);
1045   set->content->iterator_count++;
1046   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1047   set->iter_generation = set->current_generation;
1048   send_client_element (set);
1049 }
1050
1051
1052 /**
1053  * Called when a client wants to create a new set.  This is typically
1054  * the first request from a client, and includes the type of set
1055  * operation to be performed.
1056  *
1057  * @param cls client that sent the message
1058  * @param m message sent by the client
1059  */
1060 static void
1061 handle_client_create_set (void *cls,
1062                           const struct GNUNET_SET_CreateMessage *msg)
1063 {
1064   struct ClientState *cs = cls;
1065   struct Set *set;
1066
1067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068               "Client created new set (operation %u)\n",
1069               (uint32_t) ntohl (msg->operation));
1070   if (NULL != cs->set)
1071   {
1072     /* There can only be one set per client */
1073     GNUNET_break (0);
1074     GNUNET_SERVICE_client_drop (cs->client);
1075     return;
1076   }
1077   set = GNUNET_new (struct Set);
1078   switch (ntohl (msg->operation))
1079   {
1080   case GNUNET_SET_OPERATION_INTERSECTION:
1081     set->vt = _GSS_intersection_vt ();
1082     break;
1083   case GNUNET_SET_OPERATION_UNION:
1084     set->vt = _GSS_union_vt ();
1085     break;
1086   default:
1087     GNUNET_free (set);
1088     GNUNET_break (0);
1089     GNUNET_SERVICE_client_drop (cs->client);
1090     return;
1091   }
1092   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1093   set->state = set->vt->create ();
1094   if (NULL == set->state)
1095   {
1096     /* initialization failed (i.e. out of memory) */
1097     GNUNET_free (set);
1098     GNUNET_SERVICE_client_drop (cs->client);
1099     return;
1100   }
1101   set->content = GNUNET_new (struct SetContent);
1102   set->content->refcount = 1;
1103   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1104                                                                  GNUNET_YES);
1105   set->cs = cs;
1106   cs->set = set;
1107   GNUNET_SERVICE_client_continue (cs->client);
1108 }
1109
1110
1111 /**
1112  * Timeout happens iff:
1113  *  - we suggested an operation to our listener,
1114  *    but did not receive a response in time
1115  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1116  *
1117  * @param cls channel context
1118  * @param tc context information (why was this task triggered now)
1119  */
1120 static void
1121 incoming_timeout_cb (void *cls)
1122 {
1123   struct Operation *op = cls;
1124
1125   op->timeout_task = NULL;
1126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127               "Remote peer's incoming request timed out\n");
1128   incoming_destroy (op);
1129 }
1130
1131
1132 /**
1133  * Method called whenever another peer has added us to a channel the
1134  * other peer initiated.  Only called (once) upon reception of data
1135  * from a channel we listen on.
1136  *
1137  * The channel context represents the operation itself and gets added
1138  * to a DLL, from where it gets looked up when our local listener
1139  * client responds to a proposed/suggested operation or connects and
1140  * associates with this operation.
1141  *
1142  * @param cls closure
1143  * @param channel new handle to the channel
1144  * @param source peer that started the channel
1145  * @return initial channel context for the channel
1146  *         returns NULL on error
1147  */
1148 static void *
1149 channel_new_cb (void *cls,
1150                 struct GNUNET_CADET_Channel *channel,
1151                 const struct GNUNET_PeerIdentity *source)
1152 {
1153   struct Listener *listener = cls;
1154   struct Operation *op;
1155
1156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157               "New incoming channel\n");
1158   op = GNUNET_new (struct Operation);
1159   op->listener = listener;
1160   op->peer = *source;
1161   op->channel = channel;
1162   op->mq = GNUNET_CADET_get_mq (op->channel);
1163   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1164                                        UINT32_MAX);
1165   op->timeout_task
1166     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1167                                     &incoming_timeout_cb,
1168                                     op);
1169   GNUNET_CONTAINER_DLL_insert (listener->op_head,
1170                                listener->op_tail,
1171                                op);
1172   return op;
1173 }
1174
1175
1176 /**
1177  * Function called whenever a channel is destroyed.  Should clean up
1178  * any associated state.  It must NOT call
1179  * GNUNET_CADET_channel_destroy() on the channel.
1180  *
1181  * The peer_disconnect function is part of a a virtual table set initially either
1182  * when a peer creates a new channel with us, or once we create
1183  * a new channel ourselves (evaluate).
1184  *
1185  * Once we know the exact type of operation (union/intersection), the vt is
1186  * replaced with an operation specific instance (_GSS_[op]_vt).
1187  *
1188  * @param channel_ctx place where local state associated
1189  *                   with the channel is stored
1190  * @param channel connection to the other end (henceforth invalid)
1191  */
1192 static void
1193 channel_end_cb (void *channel_ctx,
1194                 const struct GNUNET_CADET_Channel *channel)
1195 {
1196   struct Operation *op = channel_ctx;
1197
1198   op->channel = NULL;
1199   _GSS_operation_destroy2 (op);
1200 }
1201
1202
1203 /**
1204  * This function probably should not exist
1205  * and be replaced by inlining more specific
1206  * logic in the various places where it is called.
1207  */
1208 void
1209 _GSS_operation_destroy2 (struct Operation *op)
1210 {
1211   struct GNUNET_CADET_Channel *channel;
1212
1213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214               "channel_end_cb called\n");
1215   if (NULL != (channel = op->channel))
1216   {
1217     /* This will free op; called conditionally as this helper function
1218        is also called from within the channel disconnect handler. */
1219     op->channel = NULL;
1220     GNUNET_CADET_channel_destroy (channel);
1221   }
1222   if (NULL != op->listener)
1223     incoming_destroy (op);
1224   else if (NULL != op->set)
1225     op->set->vt->channel_death (op);
1226   else
1227     _GSS_operation_destroy (op,
1228                             GNUNET_YES);
1229   GNUNET_free (op);
1230 }
1231
1232
1233 /**
1234  * Function called whenever an MQ-channel's transmission window size changes.
1235  *
1236  * The first callback in an outgoing channel will be with a non-zero value
1237  * and will mean the channel is connected to the destination.
1238  *
1239  * For an incoming channel it will be called immediately after the
1240  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1241  *
1242  * @param cls Channel closure.
1243  * @param channel Connection to the other end (henceforth invalid).
1244  * @param window_size New window size. If the is more messages than buffer size
1245  *                    this value will be negative..
1246  */
1247 static void
1248 channel_window_cb (void *cls,
1249                    const struct GNUNET_CADET_Channel *channel,
1250                    int window_size)
1251 {
1252   /* FIXME: not implemented, we could do flow control here... */
1253 }
1254
1255
1256 /**
1257  * Called when a client wants to create a new listener.
1258  *
1259  * @param cls client that sent the message
1260  * @param msg message sent by the client
1261  */
1262 static void
1263 handle_client_listen (void *cls,
1264                       const struct GNUNET_SET_ListenMessage *msg)
1265 {
1266   struct ClientState *cs = cls;
1267   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1268     GNUNET_MQ_hd_var_size (incoming_msg,
1269                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1270                            struct OperationRequestMessage,
1271                            NULL),
1272     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1273                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1274                            struct IBFMessage,
1275                            NULL),
1276     GNUNET_MQ_hd_var_size (union_p2p_elements,
1277                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1278                            struct GNUNET_SET_ElementMessage,
1279                            NULL),
1280     GNUNET_MQ_hd_var_size (union_p2p_offer,
1281                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1282                            struct GNUNET_MessageHeader,
1283                            NULL),
1284     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1285                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1286                            struct InquiryMessage,
1287                            NULL),
1288     GNUNET_MQ_hd_var_size (union_p2p_demand,
1289                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1290                            struct GNUNET_MessageHeader,
1291                            NULL),
1292     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1293                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1294                              struct GNUNET_MessageHeader,
1295                              NULL),
1296     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1297                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1298                              struct GNUNET_MessageHeader,
1299                              NULL),
1300     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1301                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1302                              struct GNUNET_MessageHeader,
1303                              NULL),
1304     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1305                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1306                              struct GNUNET_MessageHeader,
1307                              NULL),
1308     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1309                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1310                            struct StrataEstimatorMessage,
1311                            NULL),
1312     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1313                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1314                            struct StrataEstimatorMessage,
1315                            NULL),
1316     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1317                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1318                            struct GNUNET_SET_ElementMessage,
1319                            NULL),
1320     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1321                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1322                              struct IntersectionElementInfoMessage,
1323                              NULL),
1324     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1325                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1326                            struct BFMessage,
1327                            NULL),
1328     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1329                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1330                              struct IntersectionDoneMessage,
1331                              NULL),
1332     GNUNET_MQ_handler_end ()
1333   };
1334   struct Listener *listener;
1335
1336   if (NULL != cs->listener)
1337   {
1338     /* max. one active listener per client! */
1339     GNUNET_break (0);
1340     GNUNET_SERVICE_client_drop (cs->client);
1341     return;
1342   }
1343   listener = GNUNET_new (struct Listener);
1344   listener->cs = cs;
1345   cs->listener = listener;
1346   listener->app_id = msg->app_id;
1347   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1348   GNUNET_CONTAINER_DLL_insert (listener_head,
1349                                listener_tail,
1350                                listener);
1351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352               "New listener created (op %u, port %s)\n",
1353               listener->operation,
1354               GNUNET_h2s (&listener->app_id));
1355   listener->open_port
1356     = GNUNET_CADET_open_port (cadet,
1357                               &msg->app_id,
1358                               &channel_new_cb,
1359                               listener,
1360                               &channel_window_cb,
1361                               &channel_end_cb,
1362                               cadet_handlers);
1363   GNUNET_SERVICE_client_continue (cs->client);
1364 }
1365
1366
1367 /**
1368  * Called when the listening client rejects an operation
1369  * request by another peer.
1370  *
1371  * @param cls client that sent the message
1372  * @param msg message sent by the client
1373  */
1374 static void
1375 handle_client_reject (void *cls,
1376                       const struct GNUNET_SET_RejectMessage *msg)
1377 {
1378   struct ClientState *cs = cls;
1379   struct Operation *op;
1380
1381   op = get_incoming (ntohl (msg->accept_reject_id));
1382   if (NULL == op)
1383   {
1384     /* no matching incoming operation for this reject;
1385        could be that the other peer already disconnected... */
1386     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1387                 "Client rejected unknown operation %u\n",
1388                 (unsigned int) ntohl (msg->accept_reject_id));
1389     GNUNET_SERVICE_client_continue (cs->client);
1390     return;
1391   }
1392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393               "Peer request (op %u, app %s) rejected by client\n",
1394               op->listener->operation,
1395               GNUNET_h2s (&cs->listener->app_id));
1396   _GSS_operation_destroy2 (op);
1397   GNUNET_SERVICE_client_continue (cs->client);
1398 }
1399
1400
1401 /**
1402  * Called when a client wants to add or remove an element to a set it inhabits.
1403  *
1404  * @param cls client that sent the message
1405  * @param msg message sent by the client
1406  */
1407 static int
1408 check_client_mutation (void *cls,
1409                        const struct GNUNET_SET_ElementMessage *msg)
1410 {
1411   /* NOTE: Technically, we should probably check with the
1412      block library whether the element we are given is well-formed */
1413   return GNUNET_OK;
1414 }
1415
1416
1417 /**
1418  * Called when a client wants to add or remove an element to a set it inhabits.
1419  *
1420  * @param cls client that sent the message
1421  * @param msg message sent by the client
1422  */
1423 static void
1424 handle_client_mutation (void *cls,
1425                         const struct GNUNET_SET_ElementMessage *msg)
1426 {
1427   struct ClientState *cs = cls;
1428   struct Set *set;
1429
1430   if (NULL == (set = cs->set))
1431   {
1432     /* client without a set requested an operation */
1433     GNUNET_break (0);
1434     GNUNET_SERVICE_client_drop (cs->client);
1435     return;
1436   }
1437   GNUNET_SERVICE_client_continue (cs->client);
1438
1439   if (0 != set->content->iterator_count)
1440   {
1441     struct PendingMutation *pm;
1442
1443     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1444                 "Scheduling mutation on set\n");
1445     pm = GNUNET_new (struct PendingMutation);
1446     pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1447     pm->set = set;
1448     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1449                                       set->content->pending_mutations_tail,
1450                                       pm);
1451     return;
1452   }
1453   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1454               "Executing mutation on set\n");
1455   execute_mutation (set,
1456                     msg);
1457 }
1458
1459
1460 /**
1461  * Advance the current generation of a set,
1462  * adding exclusion ranges if necessary.
1463  *
1464  * @param set the set where we want to advance the generation
1465  */
1466 static void
1467 advance_generation (struct Set *set)
1468 {
1469   struct GenerationRange r;
1470
1471   if (set->current_generation == set->content->latest_generation)
1472   {
1473     set->content->latest_generation++;
1474     set->current_generation++;
1475     return;
1476   }
1477
1478   GNUNET_assert (set->current_generation < set->content->latest_generation);
1479
1480   r.start = set->current_generation + 1;
1481   r.end = set->content->latest_generation + 1;
1482   set->content->latest_generation = r.end;
1483   set->current_generation = r.end;
1484   GNUNET_array_append (set->excluded_generations,
1485                        set->excluded_generations_size,
1486                        r);
1487 }
1488
1489
1490 /**
1491  * Called when a client wants to initiate a set operation with another
1492  * peer.  Initiates the CADET connection to the listener and sends the
1493  * request.
1494  *
1495  * @param cls client that sent the message
1496  * @param msg message sent by the client
1497  * @return #GNUNET_OK if the message is well-formed
1498  */
1499 static int
1500 check_client_evaluate (void *cls,
1501                         const struct GNUNET_SET_EvaluateMessage *msg)
1502 {
1503   /* FIXME: suboptimal, even if the context below could be NULL,
1504      there are malformed messages this does not check for... */
1505   return GNUNET_OK;
1506 }
1507
1508
1509 /**
1510  * Called when a client wants to initiate a set operation with another
1511  * peer.  Initiates the CADET connection to the listener and sends the
1512  * request.
1513  *
1514  * @param cls client that sent the message
1515  * @param msg message sent by the client
1516  */
1517 static void
1518 handle_client_evaluate (void *cls,
1519                         const struct GNUNET_SET_EvaluateMessage *msg)
1520 {
1521   struct ClientState *cs = cls;
1522   struct Operation *op = GNUNET_new (struct Operation);
1523   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1524     GNUNET_MQ_hd_var_size (incoming_msg,
1525                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1526                            struct OperationRequestMessage,
1527                            op),
1528     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1529                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1530                            struct IBFMessage,
1531                            op),
1532     GNUNET_MQ_hd_var_size (union_p2p_elements,
1533                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1534                            struct GNUNET_SET_ElementMessage,
1535                            op),
1536     GNUNET_MQ_hd_var_size (union_p2p_offer,
1537                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1538                            struct GNUNET_MessageHeader,
1539                            op),
1540     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1541                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1542                            struct InquiryMessage,
1543                            op),
1544     GNUNET_MQ_hd_var_size (union_p2p_demand,
1545                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1546                            struct GNUNET_MessageHeader,
1547                            op),
1548     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1549                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1550                              struct GNUNET_MessageHeader,
1551                              op),
1552     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1553                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1554                              struct GNUNET_MessageHeader,
1555                              op),
1556     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1557                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1558                              struct GNUNET_MessageHeader,
1559                              op),
1560     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1561                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1562                              struct GNUNET_MessageHeader,
1563                              op),
1564     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1565                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1566                            struct StrataEstimatorMessage,
1567                            op),
1568     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1569                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1570                            struct StrataEstimatorMessage,
1571                            op),
1572     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1573                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1574                            struct GNUNET_SET_ElementMessage,
1575                            op),
1576     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1577                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1578                              struct IntersectionElementInfoMessage,
1579                              op),
1580     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1581                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1582                            struct BFMessage,
1583                            op),
1584     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1585                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1586                              struct IntersectionDoneMessage,
1587                              op),
1588     GNUNET_MQ_handler_end ()
1589   };
1590   struct Set *set;
1591   const struct GNUNET_MessageHeader *context;
1592
1593   if (NULL == (set = cs->set))
1594   {
1595     GNUNET_break (0);
1596     GNUNET_free (op);
1597     GNUNET_SERVICE_client_drop (cs->client);
1598     return;
1599   }
1600   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1601                                        UINT32_MAX);
1602   op->peer = msg->target_peer;
1603   op->result_mode = ntohl (msg->result_mode);
1604   op->client_request_id = ntohl (msg->request_id);
1605   op->byzantine = msg->byzantine;
1606   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1607   op->force_full = msg->force_full;
1608   op->force_delta = msg->force_delta;
1609   context = GNUNET_MQ_extract_nested_mh (msg);
1610
1611   /* Advance generation values, so that
1612      mutations won't interfer with the running operation. */
1613   op->set = set;
1614   op->generation_created = set->current_generation;
1615   advance_generation (set);
1616   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1617                                set->ops_tail,
1618                                op);
1619   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620               "Creating new CADET channel to port %s for set operation type %u\n",
1621               GNUNET_h2s (&msg->app_id),
1622               set->operation);
1623   op->channel = GNUNET_CADET_channel_create (cadet,
1624                                              op,
1625                                              &msg->target_peer,
1626                                              &msg->app_id,
1627                                              GNUNET_CADET_OPTION_RELIABLE,
1628                                              &channel_window_cb,
1629                                              &channel_end_cb,
1630                                              cadet_handlers);
1631   op->mq = GNUNET_CADET_get_mq (op->channel);
1632   op->state = set->vt->evaluate (op,
1633                                  context);
1634   if (NULL == op->state)
1635   {
1636     GNUNET_break (0);
1637     GNUNET_SERVICE_client_drop (cs->client);
1638     return;
1639   }
1640   GNUNET_SERVICE_client_continue (cs->client);
1641 }
1642
1643
1644 /**
1645  * Handle an ack from a client, and send the next element. Note
1646  * that we only expect acks for set elements, not after the
1647  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1648  *
1649  * @param cls client the client
1650  * @param ack the message
1651  */
1652 static void
1653 handle_client_iter_ack (void *cls,
1654                         const struct GNUNET_SET_IterAckMessage *ack)
1655 {
1656   struct ClientState *cs = cls;
1657   struct Set *set;
1658
1659   if (NULL == (set = cs->set))
1660   {
1661     /* client without a set acknowledged receiving a value */
1662     GNUNET_break (0);
1663     GNUNET_SERVICE_client_drop (cs->client);
1664     return;
1665   }
1666   if (NULL == set->iter)
1667   {
1668     /* client sent an ack, but we were not expecting one (as
1669        set iteration has finished) */
1670     GNUNET_break (0);
1671     GNUNET_SERVICE_client_drop (cs->client);
1672     return;
1673   }
1674   GNUNET_SERVICE_client_continue (cs->client);
1675   if (ntohl (ack->send_more))
1676   {
1677     send_client_element (set);
1678   }
1679   else
1680   {
1681     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1682     set->iter = NULL;
1683     set->iteration_id++;
1684   }
1685 }
1686
1687
1688 /**
1689  * Handle a request from the client to copy a set.
1690  *
1691  * @param cls the client
1692  * @param mh the message
1693  */
1694 static void
1695 handle_client_copy_lazy_prepare (void *cls,
1696                                  const struct GNUNET_MessageHeader *mh)
1697 {
1698   struct ClientState *cs = cls;
1699   struct Set *set;
1700   struct LazyCopyRequest *cr;
1701   struct GNUNET_MQ_Envelope *ev;
1702   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1703
1704   if (NULL == (set = cs->set))
1705   {
1706     /* client without a set requested an operation */
1707     GNUNET_break (0);
1708     GNUNET_SERVICE_client_drop (cs->client);
1709     return;
1710   }
1711   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1712               "Client requested creation of lazy copy\n");
1713   cr = GNUNET_new (struct LazyCopyRequest);
1714   cr->cookie = ++lazy_copy_cookie;
1715   cr->source_set = set;
1716   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1717                                lazy_copy_tail,
1718                                cr);
1719   ev = GNUNET_MQ_msg (resp_msg,
1720                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1721   resp_msg->cookie = cr->cookie;
1722   GNUNET_MQ_send (set->cs->mq,
1723                   ev);
1724   GNUNET_SERVICE_client_continue (cs->client);
1725 }
1726
1727
1728 /**
1729  * Handle a request from the client to connect to a copy of a set.
1730  *
1731  * @param cls the client
1732  * @param msg the message
1733  */
1734 static void
1735 handle_client_copy_lazy_connect (void *cls,
1736                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1737 {
1738   struct ClientState *cs = cls;
1739   struct LazyCopyRequest *cr;
1740   struct Set *set;
1741   int found;
1742
1743   if (NULL != cs->set)
1744   {
1745     /* There can only be one set per client */
1746     GNUNET_break (0);
1747     GNUNET_SERVICE_client_drop (cs->client);
1748     return;
1749   }
1750   found = GNUNET_NO;
1751   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1752   {
1753     if (cr->cookie == msg->cookie)
1754     {
1755       found = GNUNET_YES;
1756       break;
1757     }
1758   }
1759   if (GNUNET_NO == found)
1760   {
1761     /* client asked for copy with cookie we don't know */
1762     GNUNET_break (0);
1763     GNUNET_SERVICE_client_drop (cs->client);
1764     return;
1765   }
1766   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1767                                lazy_copy_tail,
1768                                cr);
1769   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1770               "Client %p requested use of lazy copy\n",
1771               cs);
1772   set = GNUNET_new (struct Set);
1773   switch (cr->source_set->operation)
1774   {
1775   case GNUNET_SET_OPERATION_INTERSECTION:
1776     set->vt = _GSS_intersection_vt ();
1777     break;
1778   case GNUNET_SET_OPERATION_UNION:
1779     set->vt = _GSS_union_vt ();
1780     break;
1781   default:
1782     GNUNET_assert (0);
1783     return;
1784   }
1785
1786   if (NULL == set->vt->copy_state)
1787   {
1788     /* Lazy copy not supported for this set operation */
1789     GNUNET_break (0);
1790     GNUNET_free (set);
1791     GNUNET_free (cr);
1792     GNUNET_SERVICE_client_drop (cs->client);
1793     return;
1794   }
1795
1796   set->operation = cr->source_set->operation;
1797   set->state = set->vt->copy_state (cr->source_set->state);
1798   set->content = cr->source_set->content;
1799   set->content->refcount++;
1800
1801   set->current_generation = cr->source_set->current_generation;
1802   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1803   set->excluded_generations
1804     = GNUNET_memdup (cr->source_set->excluded_generations,
1805                      set->excluded_generations_size * sizeof (struct GenerationRange));
1806
1807   /* Advance the generation of the new set, so that mutations to the
1808      of the cloned set and the source set are independent. */
1809   advance_generation (set);
1810   set->cs = cs;
1811   cs->set = set;
1812   GNUNET_free (cr);
1813   GNUNET_SERVICE_client_continue (cs->client);
1814 }
1815
1816
1817 /**
1818  * Handle a request from the client to cancel a running set operation.
1819  *
1820  * @param cls the client
1821  * @param msg the message
1822  */
1823 static void
1824 handle_client_cancel (void *cls,
1825                       const struct GNUNET_SET_CancelMessage *msg)
1826 {
1827   struct ClientState *cs = cls;
1828   struct Set *set;
1829   struct Operation *op;
1830   int found;
1831
1832   if (NULL == (set = cs->set))
1833   {
1834     /* client without a set requested an operation */
1835     GNUNET_break (0);
1836     GNUNET_SERVICE_client_drop (cs->client);
1837     return;
1838   }
1839   found = GNUNET_NO;
1840   for (op = set->ops_head; NULL != op; op = op->next)
1841   {
1842     if (op->client_request_id == ntohl (msg->request_id))
1843     {
1844       found = GNUNET_YES;
1845       break;
1846     }
1847   }
1848   if (GNUNET_NO == found)
1849   {
1850     /* It may happen that the operation was already destroyed due to
1851      * the other peer disconnecting.  The client may not know about this
1852      * yet and try to cancel the (just barely non-existent) operation.
1853      * So this is not a hard error.
1854      */
1855     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1856                 "Client canceled non-existent op %u\n",
1857                 (uint32_t) ntohl (msg->request_id));
1858   }
1859   else
1860   {
1861     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1862                 "Client requested cancel for op %u\n",
1863                 (uint32_t) ntohl (msg->request_id));
1864     _GSS_operation_destroy (op,
1865                             GNUNET_YES);
1866   }
1867   GNUNET_SERVICE_client_continue (cs->client);
1868 }
1869
1870
1871 /**
1872  * Handle a request from the client to accept a set operation that
1873  * came from a remote peer.  We forward the accept to the associated
1874  * operation for handling
1875  *
1876  * @param cls the client
1877  * @param msg the message
1878  */
1879 static void
1880 handle_client_accept (void *cls,
1881                       const struct GNUNET_SET_AcceptMessage *msg)
1882 {
1883   struct ClientState *cs = cls;
1884   struct Set *set;
1885   struct Operation *op;
1886   struct GNUNET_SET_ResultMessage *result_message;
1887   struct GNUNET_MQ_Envelope *ev;
1888   struct Listener *listener;
1889
1890   if (NULL == (set = cs->set))
1891   {
1892     /* client without a set requested to accept */
1893     GNUNET_break (0);
1894     GNUNET_SERVICE_client_drop (cs->client);
1895     return;
1896   }
1897   op = get_incoming (ntohl (msg->accept_reject_id));
1898   if (NULL == op)
1899   {
1900     /* It is not an error if the set op does not exist -- it may
1901      * have been destroyed when the partner peer disconnected. */
1902     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1903                 "Client %p accepted request %u of listener %p that is no longer active\n",
1904                 cs,
1905                 ntohl (msg->accept_reject_id),
1906                 cs->listener);
1907     ev = GNUNET_MQ_msg (result_message,
1908                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1909     result_message->request_id = msg->request_id;
1910     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1911     GNUNET_MQ_send (set->cs->mq,
1912                     ev);
1913     GNUNET_SERVICE_client_continue (cs->client);
1914     return;
1915   }
1916   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917               "Client accepting request %u\n",
1918               (uint32_t) ntohl (msg->accept_reject_id));
1919   listener = op->listener;
1920   op->listener = NULL;
1921   GNUNET_CONTAINER_DLL_remove (listener->op_head,
1922                                listener->op_tail,
1923                                op);
1924   op->set = set;
1925   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1926                                set->ops_tail,
1927                                op);
1928   op->client_request_id = ntohl (msg->request_id);
1929   op->result_mode = ntohl (msg->result_mode);
1930   op->byzantine = msg->byzantine;
1931   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1932   op->force_full = msg->force_full;
1933   op->force_delta = msg->force_delta;
1934
1935   /* Advance generation values, so that future mutations do not
1936      interfer with the running operation. */
1937   op->generation_created = set->current_generation;
1938   advance_generation (set);
1939   GNUNET_assert (NULL == op->state);
1940   op->state = set->vt->accept (op);
1941   if (NULL == op->state)
1942   {
1943     GNUNET_break (0);
1944     GNUNET_SERVICE_client_drop (cs->client);
1945     return;
1946   }
1947   /* Now allow CADET to continue, as we did not do this in
1948      #handle_incoming_msg (as we wanted to first see if the
1949      local client would accept the request). */
1950   GNUNET_CADET_receive_done (op->channel);
1951   GNUNET_SERVICE_client_continue (cs->client);
1952 }
1953
1954
1955 /**
1956  * Called to clean up, after a shutdown has been requested.
1957  *
1958  * @param cls closure, NULL
1959  */
1960 static void
1961 shutdown_task (void *cls)
1962 {
1963   /* Delay actual shutdown to allow service to disconnect clients */
1964   in_shutdown = GNUNET_YES;
1965   if (0 == num_clients)
1966   {
1967     if (NULL != cadet)
1968     {
1969       GNUNET_CADET_disconnect (cadet);
1970       cadet = NULL;
1971     }
1972   }
1973   GNUNET_STATISTICS_destroy (_GSS_statistics,
1974                              GNUNET_YES);
1975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976               "handled shutdown request\n");
1977 }
1978
1979
1980 /**
1981  * Function called by the service's run
1982  * method to run service-specific setup code.
1983  *
1984  * @param cls closure
1985  * @param cfg configuration to use
1986  * @param service the initialized service
1987  */
1988 static void
1989 run (void *cls,
1990      const struct GNUNET_CONFIGURATION_Handle *cfg,
1991      struct GNUNET_SERVICE_Handle *service)
1992 {
1993   /* FIXME: need to modify SERVICE (!) API to allow
1994      us to run a shutdown task *after* clients were
1995      forcefully disconnected! */
1996   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1997                                  NULL);
1998   _GSS_statistics = GNUNET_STATISTICS_create ("set",
1999                                               cfg);
2000   cadet = GNUNET_CADET_connect (cfg);
2001   if (NULL == cadet)
2002   {
2003     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2004                 _("Could not connect to CADET service\n"));
2005     GNUNET_SCHEDULER_shutdown ();
2006     return;
2007   }
2008 }
2009
2010
2011 /**
2012  * Define "main" method using service macro.
2013  */
2014 GNUNET_SERVICE_MAIN
2015 ("set",
2016  GNUNET_SERVICE_OPTION_NONE,
2017  &run,
2018  &client_connect_cb,
2019  &client_disconnect_cb,
2020  NULL,
2021  GNUNET_MQ_hd_fixed_size (client_accept,
2022                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2023                           struct GNUNET_SET_AcceptMessage,
2024                           NULL),
2025  GNUNET_MQ_hd_fixed_size (client_iter_ack,
2026                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2027                           struct GNUNET_SET_IterAckMessage,
2028                           NULL),
2029  GNUNET_MQ_hd_var_size (client_mutation,
2030                         GNUNET_MESSAGE_TYPE_SET_ADD,
2031                         struct GNUNET_SET_ElementMessage,
2032                         NULL),
2033  GNUNET_MQ_hd_fixed_size (client_create_set,
2034                           GNUNET_MESSAGE_TYPE_SET_CREATE,
2035                           struct GNUNET_SET_CreateMessage,
2036                           NULL),
2037  GNUNET_MQ_hd_fixed_size (client_iterate,
2038                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2039                           struct GNUNET_MessageHeader,
2040                           NULL),
2041  GNUNET_MQ_hd_var_size (client_evaluate,
2042                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2043                         struct GNUNET_SET_EvaluateMessage,
2044                         NULL),
2045  GNUNET_MQ_hd_fixed_size (client_listen,
2046                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
2047                           struct GNUNET_SET_ListenMessage,
2048                           NULL),
2049  GNUNET_MQ_hd_fixed_size (client_reject,
2050                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2051                           struct GNUNET_SET_RejectMessage,
2052                           NULL),
2053  GNUNET_MQ_hd_var_size (client_mutation,
2054                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2055                         struct GNUNET_SET_ElementMessage,
2056                         NULL),
2057  GNUNET_MQ_hd_fixed_size (client_cancel,
2058                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2059                           struct GNUNET_SET_CancelMessage,
2060                           NULL),
2061  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2062                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2063                           struct GNUNET_MessageHeader,
2064                           NULL),
2065  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2066                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2067                           struct GNUNET_SET_CopyLazyConnectMessage,
2068                           NULL),
2069  GNUNET_MQ_handler_end ());
2070
2071
2072 /* end of gnunet-service-set.c */