-comments: the world ain't all male
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14      
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file set/gnunet-service-set.c
20  * @brief two-peer set operations
21  * @author Florian Dold
22  * @author Christian Grothoff
23  */
24 #include "gnunet-service-set.h"
25 #include "gnunet-service-set_union.h"
26 #include "gnunet-service-set_intersection.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
29
30 /**
31  * How long do we hold on to an incoming channel if there is
32  * no local listener before giving up?
33  */
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35
36
37 /**
38  * Lazy copy requests made by a client.
39  */
40 struct LazyCopyRequest
41 {
42   /**
43    * Kept in a DLL.
44    */
45   struct LazyCopyRequest *prev;
46
47   /**
48    * Kept in a DLL.
49    */
50   struct LazyCopyRequest *next;
51
52   /**
53    * Which set are we supposed to copy?
54    */
55   struct Set *source_set;
56
57   /**
58    * Cookie identifying the request.
59    */
60   uint32_t cookie;
61
62 };
63
64
65 /**
66  * A listener is inhabited by a client, and waits for evaluation
67  * requests from remote peers.
68  */
69 struct Listener
70 {
71   /**
72    * Listeners are held in a doubly linked list.
73    */
74   struct Listener *next;
75
76   /**
77    * Listeners are held in a doubly linked list.
78    */
79   struct Listener *prev;
80
81   /**
82    * Head of DLL of operations this listener is responsible for.
83    * Once the client has accepted/declined the operation, the
84    * operation is moved to the respective set's operation DLLS.
85    */
86   struct Operation *op_head;
87
88   /**
89    * Tail of DLL of operations this listener is responsible for.
90    * Once the client has accepted/declined the operation, the
91    * operation is moved to the respective set's operation DLLS.
92    */
93   struct Operation *op_tail;
94
95   /**
96    * Client that owns the listener.
97    * Only one client may own a listener.
98    */
99   struct ClientState *cs;
100
101   /**
102    * The port we are listening on with CADET.
103    */
104   struct GNUNET_CADET_Port *open_port;
105
106   /**
107    * Application ID for the operation, used to distinguish
108    * multiple operations of the same type with the same peer.
109    */
110   struct GNUNET_HashCode app_id;
111
112   /**
113    * The type of the operation.
114    */
115   enum GNUNET_SET_OperationType operation;
116 };
117
118
119 /**
120  * Handle to the cadet service, used to listen for and connect to
121  * remote peers.
122  */
123 static struct GNUNET_CADET_Handle *cadet;
124
125 /**
126  * DLL of lazy copy requests by this client.
127  */
128 static struct LazyCopyRequest *lazy_copy_head;
129
130 /**
131  * DLL of lazy copy requests by this client.
132  */
133 static struct LazyCopyRequest *lazy_copy_tail;
134
135 /**
136  * Generator for unique cookie we set per lazy copy request.
137  */
138 static uint32_t lazy_copy_cookie;
139
140 /**
141  * Statistics handle.
142  */
143 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
144
145 /**
146  * Listeners are held in a doubly linked list.
147  */
148 static struct Listener *listener_head;
149
150 /**
151  * Listeners are held in a doubly linked list.
152  */
153 static struct Listener *listener_tail;
154
155 /**
156  * Number of active clients.
157  */
158 static unsigned int num_clients;
159
160 /**
161  * Are we in shutdown? if #GNUNET_YES and the number of clients
162  * drops to zero, disconnect from CADET.
163  */
164 static int in_shutdown;
165
166 /**
167  * Counter for allocating unique IDs for clients, used to identify
168  * incoming operation requests from remote peers, that the client can
169  * choose to accept or refuse.  0 must not be used (reserved for
170  * uninitialized).
171  */
172 static uint32_t suggest_id;
173
174
175 /**
176  * Get the incoming socket associated with the given id.
177  *
178  * @param listener the listener to look in
179  * @param id id to look for
180  * @return the incoming socket associated with the id,
181  *         or NULL if there is none
182  */
183 static struct Operation *
184 get_incoming (uint32_t id)
185 {
186   for (struct Listener *listener = listener_head;
187        NULL != listener;
188        listener = listener->next)
189   {
190     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191       if (op->suggest_id == id)
192         return op;
193   }
194   return NULL;
195 }
196
197
198 /**
199  * Destroy an incoming request from a remote peer
200  *
201  * @param op remote request to destroy
202  */
203 static void
204 incoming_destroy (struct Operation *op)
205 {
206   struct Listener *listener;
207   struct GNUNET_CADET_Channel *channel;
208
209   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210               "Destroying incoming operation %p\n",
211               op);
212   if (NULL != (listener = op->listener))
213   {
214     GNUNET_CONTAINER_DLL_remove (listener->op_head,
215                                  listener->op_tail,
216                                  op);
217     op->listener = NULL;
218   }
219   if (NULL != op->timeout_task)
220   {
221     GNUNET_SCHEDULER_cancel (op->timeout_task);
222     op->timeout_task = NULL;
223   }
224   if (NULL != (channel = op->channel))
225   {
226     op->channel = NULL;
227     GNUNET_CADET_channel_destroy (channel);
228   }
229 }
230
231
232 /**
233  * Context for the #garbage_collect_cb().
234  */
235 struct GarbageContext
236 {
237
238   /**
239    * Map for which we are garbage collecting removed elements.
240    */
241   struct GNUNET_CONTAINER_MultiHashMap *map;
242
243   /**
244    * Lowest generation for which an operation is still pending.
245    */
246   unsigned int min_op_generation;
247
248   /**
249    * Largest generation for which an operation is still pending.
250    */
251   unsigned int max_op_generation;
252
253 };
254
255
256 /**
257  * Function invoked to check if an element can be removed from
258  * the set's history because it is no longer needed.
259  *
260  * @param cls the `struct GarbageContext *`
261  * @param key key of the element in the map
262  * @param value the `struct ElementEntry *`
263  * @return #GNUNET_OK (continue to iterate)
264  */
265 static int
266 garbage_collect_cb (void *cls,
267                     const struct GNUNET_HashCode *key,
268                     void *value)
269 {
270   //struct GarbageContext *gc = cls;
271   //struct ElementEntry *ee = value;
272
273   //if (GNUNET_YES != ee->removed)
274   //  return GNUNET_OK;
275   //if ( (gc->max_op_generation < ee->generation_added) ||
276   //     (ee->generation_removed > gc->min_op_generation) )
277   //{
278   //  GNUNET_assert (GNUNET_YES ==
279   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
280   //                                                       key,
281   //                                                       ee));
282   //  GNUNET_free (ee);
283   //}
284   return GNUNET_OK;
285 }
286
287
288 /**
289  * Collect and destroy elements that are not needed anymore, because
290  * their lifetime (as determined by their generation) does not overlap
291  * with any active set operation.
292  *
293  * @param set set to garbage collect
294  */
295 static void
296 collect_generation_garbage (struct Set *set)
297 {
298   struct GarbageContext gc;
299
300   gc.min_op_generation = UINT_MAX;
301   gc.max_op_generation = 0;
302   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
303   {
304     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
305                                        op->generation_created);
306     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
307                                        op->generation_created);
308   }
309   gc.map = set->content->elements;
310   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
311                                          &garbage_collect_cb,
312                                          &gc);
313 }
314
315
316 /**
317  * Is @a generation in the range of exclusions?
318  *
319  * @param generation generation to query
320  * @param excluded array of generations where the element is excluded
321  * @param excluded_size length of the @a excluded array
322  * @return #GNUNET_YES if @a generation is in any of the ranges
323  */
324 static int
325 is_excluded_generation (unsigned int generation,
326                         struct GenerationRange *excluded,
327                         unsigned int excluded_size)
328 {
329   for (unsigned int i = 0; i < excluded_size; i++)
330     if ( (generation >= excluded[i].start) &&
331          (generation < excluded[i].end) )
332       return GNUNET_YES;
333   return GNUNET_NO;
334 }
335
336
337 /**
338  * Is element @a ee part of the set during @a query_generation?
339  *
340  * @param ee element to test
341  * @param query_generation generation to query
342  * @param excluded array of generations where the element is excluded
343  * @param excluded_size length of the @a excluded array
344  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
345  */
346 static int
347 is_element_of_generation (struct ElementEntry *ee,
348                           unsigned int query_generation,
349                           struct GenerationRange *excluded,
350                           unsigned int excluded_size)
351 {
352   struct MutationEvent *mut;
353   int is_present;
354
355   GNUNET_assert (NULL != ee->mutations);
356   if (GNUNET_YES ==
357       is_excluded_generation (query_generation,
358                               excluded,
359                               excluded_size))
360   {
361     GNUNET_break (0);
362     return GNUNET_NO;
363   }
364
365   is_present = GNUNET_NO;
366
367   /* Could be made faster with binary search, but lists
368      are small, so why bother. */
369   for (unsigned int i = 0; i < ee->mutations_size; i++)
370   {
371     mut = &ee->mutations[i];
372
373     if (mut->generation > query_generation)
374     {
375       /* The mutation doesn't apply to our generation
376          anymore.  We can'b break here, since mutations aren't
377          sorted by generation. */
378       continue;
379     }
380
381     if (GNUNET_YES ==
382         is_excluded_generation (mut->generation,
383                                 excluded,
384                                 excluded_size))
385     {
386       /* The generation is excluded (because it belongs to another
387          fork via a lazy copy) and thus mutations aren't considered
388          for membership testing. */
389       continue;
390     }
391
392     /* This would be an inconsistency in how we manage mutations. */
393     if ( (GNUNET_YES == is_present) &&
394          (GNUNET_YES == mut->added) )
395       GNUNET_assert (0);
396     /* Likewise. */
397     if ( (GNUNET_NO == is_present) &&
398          (GNUNET_NO == mut->added) )
399       GNUNET_assert (0);
400
401     is_present = mut->added;
402   }
403
404   return is_present;
405 }
406
407
408 /**
409  * Is element @a ee part of the set used by @a op?
410  *
411  * @param ee element to test
412  * @param op operation the defines the set and its generation
413  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
414  */
415 int
416 _GSS_is_element_of_operation (struct ElementEntry *ee,
417                               struct Operation *op)
418 {
419   return is_element_of_generation (ee,
420                                    op->generation_created,
421                                    op->set->excluded_generations,
422                                    op->set->excluded_generations_size);
423 }
424
425
426 /**
427  * Destroy the given operation.  Used for any operation where both
428  * peers were known and that thus actually had a vt and channel.  Must
429  * not be used for operations where 'listener' is still set and we do
430  * not know the other peer.
431  *
432  * Call the implementation-specific cancel function of the operation.
433  * Disconnects from the remote peer.  Does not disconnect the client,
434  * as there may be multiple operations per set.
435  *
436  * @param op operation to destroy
437  * @param gc #GNUNET_YES to perform garbage collection on the set
438  */
439 void
440 _GSS_operation_destroy (struct Operation *op,
441                         int gc)
442 {
443   struct Set *set = op->set;
444   struct GNUNET_CADET_Channel *channel;
445
446   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
447               "Destroying operation %p\n",
448               op);
449   GNUNET_assert (NULL == op->listener);
450   if (NULL != op->state)
451   {
452     set->vt->cancel (op);
453     op->state = NULL;
454   }
455   if (NULL != set)
456   {
457     GNUNET_CONTAINER_DLL_remove (set->ops_head,
458                                  set->ops_tail,
459                                  op);
460     op->set = NULL;
461   }
462   if (NULL != op->context_msg)
463   {
464     GNUNET_free (op->context_msg);
465     op->context_msg = NULL;
466   }
467   if (NULL != (channel = op->channel))
468   {
469     /* This will free op; called conditionally as this helper function
470        is also called from within the channel disconnect handler. */
471     op->channel = NULL;
472     GNUNET_CADET_channel_destroy (channel);
473   }
474   if ( (NULL != set) &&
475        (GNUNET_YES == gc) )
476     collect_generation_garbage (set);
477   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
478    * there was a channel end handler that will free 'op' on the call stack. */
479 }
480
481
482 /**
483  * Callback called when a client connects to the service.
484  *
485  * @param cls closure for the service
486  * @param c the new client that connected to the service
487  * @param mq the message queue used to send messages to the client
488  * @return @a `struct ClientState`
489  */
490 static void *
491 client_connect_cb (void *cls,
492                    struct GNUNET_SERVICE_Client *c,
493                    struct GNUNET_MQ_Handle *mq)
494 {
495   struct ClientState *cs;
496
497   num_clients++;
498   cs = GNUNET_new (struct ClientState);
499   cs->client = c;
500   cs->mq = mq;
501   return cs;
502 }
503
504
505 /**
506  * Iterator over hash map entries to free element entries.
507  *
508  * @param cls closure
509  * @param key current key code
510  * @param value a `struct ElementEntry *` to be free'd
511  * @return #GNUNET_YES (continue to iterate)
512  */
513 static int
514 destroy_elements_iterator (void *cls,
515                            const struct GNUNET_HashCode *key,
516                            void *value)
517 {
518   struct ElementEntry *ee = value;
519
520   GNUNET_free_non_null (ee->mutations);
521   GNUNET_free (ee);
522   return GNUNET_YES;
523 }
524
525
526 /**
527  * Clean up after a client has disconnected
528  *
529  * @param cls closure, unused
530  * @param client the client to clean up after
531  * @param internal_cls the `struct ClientState`
532  */
533 static void
534 client_disconnect_cb (void *cls,
535                       struct GNUNET_SERVICE_Client *client,
536                       void *internal_cls)
537 {
538   struct ClientState *cs = internal_cls;
539   struct Operation *op;
540   struct Listener *listener;
541   struct Set *set;
542
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544               "Client disconnected, cleaning up\n");
545   if (NULL != (set = cs->set))
546   {
547     struct SetContent *content = set->content;
548     struct PendingMutation *pm;
549     struct PendingMutation *pm_current;
550     struct LazyCopyRequest *lcr;
551
552     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553                 "Destroying client's set\n");
554     /* Destroy pending set operations */
555     while (NULL != set->ops_head)
556       _GSS_operation_destroy (set->ops_head,
557                               GNUNET_NO);
558
559     /* Destroy operation-specific state */
560     GNUNET_assert (NULL != set->state);
561     set->vt->destroy_set (set->state);
562     set->state = NULL;
563
564     /* Clean up ongoing iterations */
565     if (NULL != set->iter)
566     {
567       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
568       set->iter = NULL;
569       set->iteration_id++;
570     }
571
572     /* discard any pending mutations that reference this set */
573     pm = content->pending_mutations_head;
574     while (NULL != pm)
575     {
576       pm_current = pm;
577       pm = pm->next;
578       if (pm_current->set == set)
579       {
580         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
581                                      content->pending_mutations_tail,
582                                      pm_current);
583         GNUNET_free (pm_current);
584       }
585     }
586
587     /* free set content (or at least decrement RC) */
588     set->content = NULL;
589     GNUNET_assert (0 != content->refcount);
590     content->refcount--;
591     if (0 == content->refcount)
592     {
593       GNUNET_assert (NULL != content->elements);
594       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
595                                              &destroy_elements_iterator,
596                                              NULL);
597       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
598       content->elements = NULL;
599       GNUNET_free (content);
600     }
601     GNUNET_free_non_null (set->excluded_generations);
602     set->excluded_generations = NULL;
603
604     /* remove set from pending copy requests */
605     lcr = lazy_copy_head;
606     while (NULL != lcr)
607     {
608       struct LazyCopyRequest *lcr_current = lcr;
609
610       lcr = lcr->next;
611       if (lcr_current->source_set == set)
612       {
613         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
614                                      lazy_copy_tail,
615                                      lcr_current);
616         GNUNET_free (lcr_current);
617       }
618     }
619     GNUNET_free (set);
620   }
621
622   if (NULL != (listener = cs->listener))
623   {
624     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
625                 "Destroying client's listener\n");
626     GNUNET_CADET_close_port (listener->open_port);
627     listener->open_port = NULL;
628     while (NULL != (op = listener->op_head))
629     {
630       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
631                   "Destroying incoming operation `%u' from peer `%s'\n",
632                   (unsigned int) op->client_request_id,
633                   GNUNET_i2s (&op->peer));
634       incoming_destroy (op);
635     }
636     GNUNET_CONTAINER_DLL_remove (listener_head,
637                                  listener_tail,
638                                  listener);
639     GNUNET_free (listener);
640   }
641   GNUNET_free (cs);
642   num_clients--;
643   if ( (GNUNET_YES == in_shutdown) &&
644        (0 == num_clients) )
645   {
646     if (NULL != cadet)
647     {
648       GNUNET_CADET_disconnect (cadet);
649       cadet = NULL;
650     }
651   }
652 }
653
654
655 /**
656  * Check a request for a set operation from another peer.
657  *
658  * @param cls the operation state
659  * @param msg the received message
660  * @return #GNUNET_OK if the channel should be kept alive,
661  *         #GNUNET_SYSERR to destroy the channel
662  */
663 static int
664 check_incoming_msg (void *cls,
665                     const struct OperationRequestMessage *msg)
666 {
667   struct Operation *op = cls;
668   struct Listener *listener = op->listener;
669   const struct GNUNET_MessageHeader *nested_context;
670
671   /* double operation request */
672   if (0 != op->suggest_id)
673   {
674     GNUNET_break_op (0);
675     return GNUNET_SYSERR;
676   }
677   /* This should be equivalent to the previous condition, but can't hurt to check twice */
678   if (NULL == op->listener)
679   {
680     GNUNET_break (0);
681     return GNUNET_SYSERR;
682   }
683   if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
684   {
685     GNUNET_break_op (0);
686     return GNUNET_SYSERR;
687   }
688   nested_context = GNUNET_MQ_extract_nested_mh (msg);
689   if ( (NULL != nested_context) &&
690        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
691   {
692     GNUNET_break_op (0);
693     return GNUNET_SYSERR;
694   }
695   return GNUNET_OK;
696 }
697
698
699 /**
700  * Handle a request for a set operation from another peer.  Checks if we
701  * have a listener waiting for such a request (and in that case initiates
702  * asking the listener about accepting the connection). If no listener
703  * is waiting, we queue the operation request in hope that a listener
704  * shows up soon (before timeout).
705  *
706  * This msg is expected as the first and only msg handled through the
707  * non-operation bound virtual table, acceptance of this operation replaces
708  * our virtual table and subsequent msgs would be routed differently (as
709  * we then know what type of operation this is).
710  *
711  * @param cls the operation state
712  * @param msg the received message
713  * @return #GNUNET_OK if the channel should be kept alive,
714  *         #GNUNET_SYSERR to destroy the channel
715  */
716 static void
717 handle_incoming_msg (void *cls,
718                      const struct OperationRequestMessage *msg)
719 {
720   struct Operation *op = cls;
721   struct Listener *listener = op->listener;
722   const struct GNUNET_MessageHeader *nested_context;
723   struct GNUNET_MQ_Envelope *env;
724   struct GNUNET_SET_RequestMessage *cmsg;
725
726   nested_context = GNUNET_MQ_extract_nested_mh (msg);
727   /* Make a copy of the nested_context (application-specific context
728      information that is opaque to set) so we can pass it to the
729      listener later on */
730   if (NULL != nested_context)
731     op->context_msg = GNUNET_copy_message (nested_context);
732   op->remote_element_count = ntohl (msg->element_count);
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "Received P2P operation request (op %u, port %s) for active listener\n",
735               (uint32_t) ntohl (msg->operation),
736               GNUNET_h2s (&op->listener->app_id));
737   GNUNET_assert (0 == op->suggest_id);
738   if (0 == suggest_id)
739     suggest_id++;
740   op->suggest_id = suggest_id++;
741   GNUNET_assert (NULL != op->timeout_task);
742   GNUNET_SCHEDULER_cancel (op->timeout_task);
743   op->timeout_task = NULL;
744   env = GNUNET_MQ_msg_nested_mh (cmsg,
745                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
746                                  op->context_msg);
747   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748               "Suggesting incoming request with accept id %u to listener %p of client %p\n",
749               op->suggest_id,
750               listener,
751               listener->cs);
752   cmsg->accept_id = htonl (op->suggest_id);
753   cmsg->peer_id = op->peer;
754   GNUNET_MQ_send (listener->cs->mq,
755                   env);
756   /* NOTE: GNUNET_CADET_receive_done() will be called in
757      #handle_client_accept() */
758 }
759
760
761 /**
762  * Add an element to @a set as specified by @a msg
763  *
764  * @param set set to manipulate
765  * @param msg message specifying the change
766  */
767 static void
768 execute_add (struct Set *set,
769              const struct GNUNET_SET_ElementMessage *msg)
770 {
771   struct GNUNET_SET_Element el;
772   struct ElementEntry *ee;
773   struct GNUNET_HashCode hash;
774
775   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
776   el.size = ntohs (msg->header.size) - sizeof (*msg);
777   el.data = &msg[1];
778   el.element_type = ntohs (msg->element_type);
779   GNUNET_SET_element_hash (&el,
780                            &hash);
781   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
782                                           &hash);
783   if (NULL == ee)
784   {
785     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786                 "Client inserts element %s of size %u\n",
787                 GNUNET_h2s (&hash),
788                 el.size);
789     ee = GNUNET_malloc (el.size + sizeof (*ee));
790     ee->element.size = el.size;
791     GNUNET_memcpy (&ee[1],
792             el.data,
793             el.size);
794     ee->element.data = &ee[1];
795     ee->element.element_type = el.element_type;
796     ee->remote = GNUNET_NO;
797     ee->mutations = NULL;
798     ee->mutations_size = 0;
799     ee->element_hash = hash;
800     GNUNET_break (GNUNET_YES ==
801                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
802                                                      &ee->element_hash,
803                                                      ee,
804                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
805   }
806   else if (GNUNET_YES ==
807            is_element_of_generation (ee,
808                                      set->current_generation,
809                                      set->excluded_generations,
810                                      set->excluded_generations_size))
811   {
812     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813                 "Client inserted element %s of size %u twice (ignored)\n",
814                 GNUNET_h2s (&hash),
815                 el.size);
816
817     /* same element inserted twice */
818     return;
819   }
820
821   {
822     struct MutationEvent mut = {
823       .generation = set->current_generation,
824       .added = GNUNET_YES
825     };
826     GNUNET_array_append (ee->mutations,
827                          ee->mutations_size,
828                          mut);
829   }
830   set->vt->add (set->state,
831                 ee);
832 }
833
834
835 /**
836  * Remove an element from @a set as specified by @a msg
837  *
838  * @param set set to manipulate
839  * @param msg message specifying the change
840  */
841 static void
842 execute_remove (struct Set *set,
843                 const struct GNUNET_SET_ElementMessage *msg)
844 {
845   struct GNUNET_SET_Element el;
846   struct ElementEntry *ee;
847   struct GNUNET_HashCode hash;
848
849   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
850   el.size = ntohs (msg->header.size) - sizeof (*msg);
851   el.data = &msg[1];
852   el.element_type = ntohs (msg->element_type);
853   GNUNET_SET_element_hash (&el, &hash);
854   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
855                                           &hash);
856   if (NULL == ee)
857   {
858     /* Client tried to remove non-existing element. */
859     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860                 "Client removes non-existing element of size %u\n",
861                 el.size);
862     return;
863   }
864   if (GNUNET_NO ==
865       is_element_of_generation (ee,
866                                 set->current_generation,
867                                 set->excluded_generations,
868                                 set->excluded_generations_size))
869   {
870     /* Client tried to remove element twice */
871     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872                 "Client removed element of size %u twice (ignored)\n",
873                 el.size);
874     return;
875   }
876   else
877   {
878     struct MutationEvent mut = {
879       .generation = set->current_generation,
880       .added = GNUNET_NO
881     };
882
883     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884                 "Client removes element of size %u\n",
885                 el.size);
886
887     GNUNET_array_append (ee->mutations,
888                          ee->mutations_size,
889                          mut);
890   }
891   set->vt->remove (set->state,
892                    ee);
893 }
894
895
896 /**
897  * Perform a mutation on a set as specified by the @a msg
898  *
899  * @param set the set to mutate
900  * @param msg specification of what to change
901  */
902 static void
903 execute_mutation (struct Set *set,
904                   const struct GNUNET_SET_ElementMessage *msg)
905 {
906   switch (ntohs (msg->header.type))
907   {
908     case GNUNET_MESSAGE_TYPE_SET_ADD:
909       execute_add (set, msg);
910       break;
911     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
912       execute_remove (set, msg);
913       break;
914     default:
915       GNUNET_break (0);
916   }
917 }
918
919
920 /**
921  * Execute mutations that were delayed on a set because of
922  * pending operations.
923  *
924  * @param set the set to execute mutations on
925  */
926 static void
927 execute_delayed_mutations (struct Set *set)
928 {
929   struct PendingMutation *pm;
930
931   if (0 != set->content->iterator_count)
932     return; /* still cannot do this */
933   while (NULL != (pm = set->content->pending_mutations_head))
934   {
935     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
936                                  set->content->pending_mutations_tail,
937                                  pm);
938     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939                 "Executing pending mutation on %p.\n",
940                 pm->set);
941     execute_mutation (pm->set,
942                       pm->msg);
943     GNUNET_free (pm->msg);
944     GNUNET_free (pm);
945   }
946 }
947
948
949 /**
950  * Send the next element of a set to the set's client.  The next element is given by
951  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
952  * are no more elements in the set.  The caller must ensure that the set's iterator is
953  * valid.
954  *
955  * The client will acknowledge each received element with a
956  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
957  * #handle_client_iter_ack() will then trigger the next transmission.
958  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
959  *
960  * @param set set that should send its next element to its client
961  */
962 static void
963 send_client_element (struct Set *set)
964 {
965   int ret;
966   struct ElementEntry *ee;
967   struct GNUNET_MQ_Envelope *ev;
968   struct GNUNET_SET_IterResponseMessage *msg;
969
970   GNUNET_assert (NULL != set->iter);
971   do {
972     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
973                                                        NULL,
974                                                        (const void **) &ee);
975     if (GNUNET_NO == ret)
976     {
977       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978                   "Iteration on %p done.\n",
979                   set);
980       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
981       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
982       set->iter = NULL;
983       set->iteration_id++;
984       GNUNET_assert (set->content->iterator_count > 0);
985       set->content->iterator_count--;
986       execute_delayed_mutations (set);
987       GNUNET_MQ_send (set->cs->mq,
988                       ev);
989       return;
990     }
991     GNUNET_assert (NULL != ee);
992   } while (GNUNET_NO ==
993            is_element_of_generation (ee,
994                                      set->iter_generation,
995                                      set->excluded_generations,
996                                      set->excluded_generations_size));
997   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998               "Sending iteration element on %p.\n",
999               set);
1000   ev = GNUNET_MQ_msg_extra (msg,
1001                             ee->element.size,
1002                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1003   GNUNET_memcpy (&msg[1],
1004                  ee->element.data,
1005                  ee->element.size);
1006   msg->element_type = htons (ee->element.element_type);
1007   msg->iteration_id = htons (set->iteration_id);
1008   GNUNET_MQ_send (set->cs->mq,
1009                   ev);
1010 }
1011
1012
1013 /**
1014  * Called when a client wants to iterate the elements of a set.
1015  * Checks if we have a set associated with the client and if we
1016  * can right now start an iteration. If all checks out, starts
1017  * sending the elements of the set to the client.
1018  *
1019  * @param cls client that sent the message
1020  * @param m message sent by the client
1021  */
1022 static void
1023 handle_client_iterate (void *cls,
1024                        const struct GNUNET_MessageHeader *m)
1025 {
1026   struct ClientState *cs = cls;
1027   struct Set *set;
1028
1029   if (NULL == (set = cs->set))
1030   {
1031     /* attempt to iterate over a non existing set */
1032     GNUNET_break (0);
1033     GNUNET_SERVICE_client_drop (cs->client);
1034     return;
1035   }
1036   if (NULL != set->iter)
1037   {
1038     /* Only one concurrent iterate-action allowed per set */
1039     GNUNET_break (0);
1040     GNUNET_SERVICE_client_drop (cs->client);
1041     return;
1042   }
1043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044               "Iterating set %p in gen %u with %u content elements\n",
1045               (void *) set,
1046               set->current_generation,
1047               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1048   GNUNET_SERVICE_client_continue (cs->client);
1049   set->content->iterator_count++;
1050   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1051   set->iter_generation = set->current_generation;
1052   send_client_element (set);
1053 }
1054
1055
1056 /**
1057  * Called when a client wants to create a new set.  This is typically
1058  * the first request from a client, and includes the type of set
1059  * operation to be performed.
1060  *
1061  * @param cls client that sent the message
1062  * @param m message sent by the client
1063  */
1064 static void
1065 handle_client_create_set (void *cls,
1066                           const struct GNUNET_SET_CreateMessage *msg)
1067 {
1068   struct ClientState *cs = cls;
1069   struct Set *set;
1070
1071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072               "Client created new set (operation %u)\n",
1073               (uint32_t) ntohl (msg->operation));
1074   if (NULL != cs->set)
1075   {
1076     /* There can only be one set per client */
1077     GNUNET_break (0);
1078     GNUNET_SERVICE_client_drop (cs->client);
1079     return;
1080   }
1081   set = GNUNET_new (struct Set);
1082   switch (ntohl (msg->operation))
1083   {
1084   case GNUNET_SET_OPERATION_INTERSECTION:
1085     set->vt = _GSS_intersection_vt ();
1086     break;
1087   case GNUNET_SET_OPERATION_UNION:
1088     set->vt = _GSS_union_vt ();
1089     break;
1090   default:
1091     GNUNET_free (set);
1092     GNUNET_break (0);
1093     GNUNET_SERVICE_client_drop (cs->client);
1094     return;
1095   }
1096   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1097   set->state = set->vt->create ();
1098   if (NULL == set->state)
1099   {
1100     /* initialization failed (i.e. out of memory) */
1101     GNUNET_free (set);
1102     GNUNET_SERVICE_client_drop (cs->client);
1103     return;
1104   }
1105   set->content = GNUNET_new (struct SetContent);
1106   set->content->refcount = 1;
1107   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1108                                                                  GNUNET_YES);
1109   set->cs = cs;
1110   cs->set = set;
1111   GNUNET_SERVICE_client_continue (cs->client);
1112 }
1113
1114
1115 /**
1116  * Timeout happens iff:
1117  *  - we suggested an operation to our listener,
1118  *    but did not receive a response in time
1119  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1120  *
1121  * @param cls channel context
1122  * @param tc context information (why was this task triggered now)
1123  */
1124 static void
1125 incoming_timeout_cb (void *cls)
1126 {
1127   struct Operation *op = cls;
1128
1129   op->timeout_task = NULL;
1130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131               "Remote peer's incoming request timed out\n");
1132   incoming_destroy (op);
1133 }
1134
1135
1136 /**
1137  * Method called whenever another peer has added us to a channel the
1138  * other peer initiated.  Only called (once) upon reception of data
1139  * from a channel we listen on.
1140  *
1141  * The channel context represents the operation itself and gets added
1142  * to a DLL, from where it gets looked up when our local listener
1143  * client responds to a proposed/suggested operation or connects and
1144  * associates with this operation.
1145  *
1146  * @param cls closure
1147  * @param channel new handle to the channel
1148  * @param source peer that started the channel
1149  * @return initial channel context for the channel
1150  *         returns NULL on error
1151  */
1152 static void *
1153 channel_new_cb (void *cls,
1154                 struct GNUNET_CADET_Channel *channel,
1155                 const struct GNUNET_PeerIdentity *source)
1156 {
1157   struct Listener *listener = cls;
1158   struct Operation *op;
1159
1160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161               "New incoming channel\n");
1162   op = GNUNET_new (struct Operation);
1163   op->listener = listener;
1164   op->peer = *source;
1165   op->channel = channel;
1166   op->mq = GNUNET_CADET_get_mq (op->channel);
1167   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1168                                        UINT32_MAX);
1169   op->timeout_task
1170     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1171                                     &incoming_timeout_cb,
1172                                     op);
1173   GNUNET_CONTAINER_DLL_insert (listener->op_head,
1174                                listener->op_tail,
1175                                op);
1176   return op;
1177 }
1178
1179
1180 /**
1181  * Function called whenever a channel is destroyed.  Should clean up
1182  * any associated state.  It must NOT call
1183  * GNUNET_CADET_channel_destroy() on the channel.
1184  *
1185  * The peer_disconnect function is part of a a virtual table set initially either
1186  * when a peer creates a new channel with us, or once we create
1187  * a new channel ourselves (evaluate).
1188  *
1189  * Once we know the exact type of operation (union/intersection), the vt is
1190  * replaced with an operation specific instance (_GSS_[op]_vt).
1191  *
1192  * @param channel_ctx place where local state associated
1193  *                   with the channel is stored
1194  * @param channel connection to the other end (henceforth invalid)
1195  */
1196 static void
1197 channel_end_cb (void *channel_ctx,
1198                 const struct GNUNET_CADET_Channel *channel)
1199 {
1200   struct Operation *op = channel_ctx;
1201
1202   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203               "channel_end_cb called\n");
1204   op->channel = NULL;
1205   if (NULL != op->listener)
1206     incoming_destroy (op);
1207   else if (NULL != op->set)
1208     op->set->vt->channel_death (op);
1209   else
1210     _GSS_operation_destroy (op,
1211                             GNUNET_YES);
1212   GNUNET_free (op);
1213 }
1214
1215
1216 /**
1217  * Function called whenever an MQ-channel's transmission window size changes.
1218  *
1219  * The first callback in an outgoing channel will be with a non-zero value
1220  * and will mean the channel is connected to the destination.
1221  *
1222  * For an incoming channel it will be called immediately after the
1223  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1224  *
1225  * @param cls Channel closure.
1226  * @param channel Connection to the other end (henceforth invalid).
1227  * @param window_size New window size. If the is more messages than buffer size
1228  *                    this value will be negative..
1229  */
1230 static void
1231 channel_window_cb (void *cls,
1232                    const struct GNUNET_CADET_Channel *channel,
1233                    int window_size)
1234 {
1235   /* FIXME: not implemented, we could do flow control here... */
1236 }
1237
1238
1239 /**
1240  * Called when a client wants to create a new listener.
1241  *
1242  * @param cls client that sent the message
1243  * @param msg message sent by the client
1244  */
1245 static void
1246 handle_client_listen (void *cls,
1247                       const struct GNUNET_SET_ListenMessage *msg)
1248 {
1249   struct ClientState *cs = cls;
1250   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1251     GNUNET_MQ_hd_var_size (incoming_msg,
1252                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1253                            struct OperationRequestMessage,
1254                            NULL),
1255     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1256                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1257                            struct IBFMessage,
1258                            NULL),
1259     GNUNET_MQ_hd_var_size (union_p2p_elements,
1260                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1261                            struct GNUNET_SET_ElementMessage,
1262                            NULL),
1263     GNUNET_MQ_hd_var_size (union_p2p_offer,
1264                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1265                            struct GNUNET_MessageHeader,
1266                            NULL),
1267     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1268                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1269                            struct InquiryMessage,
1270                            NULL),
1271     GNUNET_MQ_hd_var_size (union_p2p_demand,
1272                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1273                            struct GNUNET_MessageHeader,
1274                            NULL),
1275     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1276                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1277                              struct GNUNET_MessageHeader,
1278                              NULL),
1279     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1280                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1281                              struct GNUNET_MessageHeader,
1282                              NULL),
1283     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1284                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1285                              struct GNUNET_MessageHeader,
1286                              NULL),
1287     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1288                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1289                              struct GNUNET_MessageHeader,
1290                              NULL),
1291     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1292                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1293                            struct StrataEstimatorMessage,
1294                            NULL),
1295     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1296                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1297                            struct StrataEstimatorMessage,
1298                            NULL),
1299     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1300                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1301                            struct GNUNET_SET_ElementMessage,
1302                            NULL),
1303     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1304                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1305                              struct IntersectionElementInfoMessage,
1306                              NULL),
1307     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1308                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1309                            struct BFMessage,
1310                            NULL),
1311     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1312                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1313                              struct IntersectionDoneMessage,
1314                              NULL),
1315     GNUNET_MQ_handler_end ()
1316   };
1317   struct Listener *listener;
1318
1319   if (NULL != cs->listener)
1320   {
1321     /* max. one active listener per client! */
1322     GNUNET_break (0);
1323     GNUNET_SERVICE_client_drop (cs->client);
1324     return;
1325   }
1326   listener = GNUNET_new (struct Listener);
1327   listener->cs = cs;
1328   cs->listener = listener;
1329   listener->app_id = msg->app_id;
1330   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1331   GNUNET_CONTAINER_DLL_insert (listener_head,
1332                                listener_tail,
1333                                listener);
1334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1335               "New listener created (op %u, port %s)\n",
1336               listener->operation,
1337               GNUNET_h2s (&listener->app_id));
1338   listener->open_port
1339     = GNUNET_CADET_open_port (cadet,
1340                               &msg->app_id,
1341                               &channel_new_cb,
1342                               listener,
1343                               &channel_window_cb,
1344                               &channel_end_cb,
1345                               cadet_handlers);
1346   GNUNET_SERVICE_client_continue (cs->client);
1347 }
1348
1349
1350 /**
1351  * Called when the listening client rejects an operation
1352  * request by another peer.
1353  *
1354  * @param cls client that sent the message
1355  * @param msg message sent by the client
1356  */
1357 static void
1358 handle_client_reject (void *cls,
1359                       const struct GNUNET_SET_RejectMessage *msg)
1360 {
1361   struct ClientState *cs = cls;
1362   struct Operation *op;
1363
1364   op = get_incoming (ntohl (msg->accept_reject_id));
1365   if (NULL == op)
1366   {
1367     /* no matching incoming operation for this reject;
1368        could be that the other peer already disconnected... */
1369     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1370                 "Client rejected unknown operation %u\n",
1371                 (unsigned int) ntohl (msg->accept_reject_id));
1372     GNUNET_SERVICE_client_continue (cs->client);
1373     return;
1374   }
1375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1376               "Peer request (op %u, app %s) rejected by client\n",
1377               op->listener->operation,
1378               GNUNET_h2s (&cs->listener->app_id));
1379   GNUNET_CADET_channel_destroy (op->channel);
1380   GNUNET_SERVICE_client_continue (cs->client);
1381 }
1382
1383
1384 /**
1385  * Called when a client wants to add or remove an element to a set it inhabits.
1386  *
1387  * @param cls client that sent the message
1388  * @param msg message sent by the client
1389  */
1390 static int
1391 check_client_mutation (void *cls,
1392                        const struct GNUNET_SET_ElementMessage *msg)
1393 {
1394   /* NOTE: Technically, we should probably check with the
1395      block library whether the element we are given is well-formed */
1396   return GNUNET_OK;
1397 }
1398
1399
1400 /**
1401  * Called when a client wants to add or remove an element to a set it inhabits.
1402  *
1403  * @param cls client that sent the message
1404  * @param msg message sent by the client
1405  */
1406 static void
1407 handle_client_mutation (void *cls,
1408                         const struct GNUNET_SET_ElementMessage *msg)
1409 {
1410   struct ClientState *cs = cls;
1411   struct Set *set;
1412
1413   if (NULL == (set = cs->set))
1414   {
1415     /* client without a set requested an operation */
1416     GNUNET_break (0);
1417     GNUNET_SERVICE_client_drop (cs->client);
1418     return;
1419   }
1420   GNUNET_SERVICE_client_continue (cs->client);
1421
1422   if (0 != set->content->iterator_count)
1423   {
1424     struct PendingMutation *pm;
1425
1426     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427                 "Scheduling mutation on set\n");
1428     pm = GNUNET_new (struct PendingMutation);
1429     pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1430     pm->set = set;
1431     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1432                                       set->content->pending_mutations_tail,
1433                                       pm);
1434     return;
1435   }
1436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437               "Executing mutation on set\n");
1438   execute_mutation (set,
1439                     msg);
1440 }
1441
1442
1443 /**
1444  * Advance the current generation of a set,
1445  * adding exclusion ranges if necessary.
1446  *
1447  * @param set the set where we want to advance the generation
1448  */
1449 static void
1450 advance_generation (struct Set *set)
1451 {
1452   struct GenerationRange r;
1453
1454   if (set->current_generation == set->content->latest_generation)
1455   {
1456     set->content->latest_generation++;
1457     set->current_generation++;
1458     return;
1459   }
1460
1461   GNUNET_assert (set->current_generation < set->content->latest_generation);
1462
1463   r.start = set->current_generation + 1;
1464   r.end = set->content->latest_generation + 1;
1465   set->content->latest_generation = r.end;
1466   set->current_generation = r.end;
1467   GNUNET_array_append (set->excluded_generations,
1468                        set->excluded_generations_size,
1469                        r);
1470 }
1471
1472
1473 /**
1474  * Called when a client wants to initiate a set operation with another
1475  * peer.  Initiates the CADET connection to the listener and sends the
1476  * request.
1477  *
1478  * @param cls client that sent the message
1479  * @param msg message sent by the client
1480  * @return #GNUNET_OK if the message is well-formed
1481  */
1482 static int
1483 check_client_evaluate (void *cls,
1484                         const struct GNUNET_SET_EvaluateMessage *msg)
1485 {
1486   /* FIXME: suboptimal, even if the context below could be NULL,
1487      there are malformed messages this does not check for... */
1488   return GNUNET_OK;
1489 }
1490
1491
1492 /**
1493  * Called when a client wants to initiate a set operation with another
1494  * peer.  Initiates the CADET connection to the listener and sends the
1495  * request.
1496  *
1497  * @param cls client that sent the message
1498  * @param msg message sent by the client
1499  */
1500 static void
1501 handle_client_evaluate (void *cls,
1502                         const struct GNUNET_SET_EvaluateMessage *msg)
1503 {
1504   struct ClientState *cs = cls;
1505   struct Operation *op = GNUNET_new (struct Operation);
1506   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1507     GNUNET_MQ_hd_var_size (incoming_msg,
1508                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1509                            struct OperationRequestMessage,
1510                            op),
1511     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1512                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1513                            struct IBFMessage,
1514                            op),
1515     GNUNET_MQ_hd_var_size (union_p2p_elements,
1516                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1517                            struct GNUNET_SET_ElementMessage,
1518                            op),
1519     GNUNET_MQ_hd_var_size (union_p2p_offer,
1520                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1521                            struct GNUNET_MessageHeader,
1522                            op),
1523     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1524                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1525                            struct InquiryMessage,
1526                            op),
1527     GNUNET_MQ_hd_var_size (union_p2p_demand,
1528                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1529                            struct GNUNET_MessageHeader,
1530                            op),
1531     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1532                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1533                              struct GNUNET_MessageHeader,
1534                              op),
1535     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1536                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1537                              struct GNUNET_MessageHeader,
1538                              op),
1539     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1540                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1541                              struct GNUNET_MessageHeader,
1542                              op),
1543     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1544                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1545                              struct GNUNET_MessageHeader,
1546                              op),
1547     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1548                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1549                            struct StrataEstimatorMessage,
1550                            op),
1551     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1552                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1553                            struct StrataEstimatorMessage,
1554                            op),
1555     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1556                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1557                            struct GNUNET_SET_ElementMessage,
1558                            op),
1559     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1560                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1561                              struct IntersectionElementInfoMessage,
1562                              op),
1563     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1564                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1565                            struct BFMessage,
1566                            op),
1567     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1568                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1569                              struct IntersectionDoneMessage,
1570                              op),
1571     GNUNET_MQ_handler_end ()
1572   };
1573   struct Set *set;
1574   const struct GNUNET_MessageHeader *context;
1575
1576   if (NULL == (set = cs->set))
1577   {
1578     GNUNET_break (0);
1579     GNUNET_free (op);
1580     GNUNET_SERVICE_client_drop (cs->client);
1581     return;
1582   }
1583   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1584                                        UINT32_MAX);
1585   op->peer = msg->target_peer;
1586   op->result_mode = ntohl (msg->result_mode);
1587   op->client_request_id = ntohl (msg->request_id);
1588   op->byzantine = msg->byzantine;
1589   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1590   op->force_full = msg->force_full;
1591   op->force_delta = msg->force_delta;
1592   context = GNUNET_MQ_extract_nested_mh (msg);
1593
1594   /* Advance generation values, so that
1595      mutations won't interfer with the running operation. */
1596   op->set = set;
1597   op->generation_created = set->current_generation;
1598   advance_generation (set);
1599   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1600                                set->ops_tail,
1601                                op);
1602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603               "Creating new CADET channel to port %s for set operation type %u\n",
1604               GNUNET_h2s (&msg->app_id),
1605               set->operation);
1606   op->channel = GNUNET_CADET_channel_create (cadet,
1607                                              op,
1608                                              &msg->target_peer,
1609                                              &msg->app_id,
1610                                              GNUNET_CADET_OPTION_RELIABLE,
1611                                              &channel_window_cb,
1612                                              &channel_end_cb,
1613                                              cadet_handlers);
1614   op->mq = GNUNET_CADET_get_mq (op->channel);
1615   op->state = set->vt->evaluate (op,
1616                                  context);
1617   if (NULL == op->state)
1618   {
1619     GNUNET_break (0);
1620     GNUNET_SERVICE_client_drop (cs->client);
1621     return;
1622   }
1623   GNUNET_SERVICE_client_continue (cs->client);
1624 }
1625
1626
1627 /**
1628  * Handle an ack from a client, and send the next element. Note
1629  * that we only expect acks for set elements, not after the
1630  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1631  *
1632  * @param cls client the client
1633  * @param ack the message
1634  */
1635 static void
1636 handle_client_iter_ack (void *cls,
1637                         const struct GNUNET_SET_IterAckMessage *ack)
1638 {
1639   struct ClientState *cs = cls;
1640   struct Set *set;
1641
1642   if (NULL == (set = cs->set))
1643   {
1644     /* client without a set acknowledged receiving a value */
1645     GNUNET_break (0);
1646     GNUNET_SERVICE_client_drop (cs->client);
1647     return;
1648   }
1649   if (NULL == set->iter)
1650   {
1651     /* client sent an ack, but we were not expecting one (as
1652        set iteration has finished) */
1653     GNUNET_break (0);
1654     GNUNET_SERVICE_client_drop (cs->client);
1655     return;
1656   }
1657   GNUNET_SERVICE_client_continue (cs->client);
1658   if (ntohl (ack->send_more))
1659   {
1660     send_client_element (set);
1661   }
1662   else
1663   {
1664     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1665     set->iter = NULL;
1666     set->iteration_id++;
1667   }
1668 }
1669
1670
1671 /**
1672  * Handle a request from the client to copy a set.
1673  *
1674  * @param cls the client
1675  * @param mh the message
1676  */
1677 static void
1678 handle_client_copy_lazy_prepare (void *cls,
1679                                  const struct GNUNET_MessageHeader *mh)
1680 {
1681   struct ClientState *cs = cls;
1682   struct Set *set;
1683   struct LazyCopyRequest *cr;
1684   struct GNUNET_MQ_Envelope *ev;
1685   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1686
1687   if (NULL == (set = cs->set))
1688   {
1689     /* client without a set requested an operation */
1690     GNUNET_break (0);
1691     GNUNET_SERVICE_client_drop (cs->client);
1692     return;
1693   }
1694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1695               "Client requested creation of lazy copy\n");
1696   cr = GNUNET_new (struct LazyCopyRequest);
1697   cr->cookie = ++lazy_copy_cookie;
1698   cr->source_set = set;
1699   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1700                                lazy_copy_tail,
1701                                cr);
1702   ev = GNUNET_MQ_msg (resp_msg,
1703                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1704   resp_msg->cookie = cr->cookie;
1705   GNUNET_MQ_send (set->cs->mq,
1706                   ev);
1707   GNUNET_SERVICE_client_continue (cs->client);
1708 }
1709
1710
1711 /**
1712  * Handle a request from the client to connect to a copy of a set.
1713  *
1714  * @param cls the client
1715  * @param msg the message
1716  */
1717 static void
1718 handle_client_copy_lazy_connect (void *cls,
1719                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1720 {
1721   struct ClientState *cs = cls;
1722   struct LazyCopyRequest *cr;
1723   struct Set *set;
1724   int found;
1725
1726   if (NULL != cs->set)
1727   {
1728     /* There can only be one set per client */
1729     GNUNET_break (0);
1730     GNUNET_SERVICE_client_drop (cs->client);
1731     return;
1732   }
1733   found = GNUNET_NO;
1734   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1735   {
1736     if (cr->cookie == msg->cookie)
1737     {
1738       found = GNUNET_YES;
1739       break;
1740     }
1741   }
1742   if (GNUNET_NO == found)
1743   {
1744     /* client asked for copy with cookie we don't know */
1745     GNUNET_break (0);
1746     GNUNET_SERVICE_client_drop (cs->client);
1747     return;
1748   }
1749   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1750                                lazy_copy_tail,
1751                                cr);
1752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1753               "Client %p requested use of lazy copy\n",
1754               cs);
1755   set = GNUNET_new (struct Set);
1756   switch (cr->source_set->operation)
1757   {
1758   case GNUNET_SET_OPERATION_INTERSECTION:
1759     set->vt = _GSS_intersection_vt ();
1760     break;
1761   case GNUNET_SET_OPERATION_UNION:
1762     set->vt = _GSS_union_vt ();
1763     break;
1764   default:
1765     GNUNET_assert (0);
1766     return;
1767   }
1768
1769   if (NULL == set->vt->copy_state)
1770   {
1771     /* Lazy copy not supported for this set operation */
1772     GNUNET_break (0);
1773     GNUNET_free (set);
1774     GNUNET_free (cr);
1775     GNUNET_SERVICE_client_drop (cs->client);
1776     return;
1777   }
1778
1779   set->operation = cr->source_set->operation;
1780   set->state = set->vt->copy_state (cr->source_set->state);
1781   set->content = cr->source_set->content;
1782   set->content->refcount++;
1783
1784   set->current_generation = cr->source_set->current_generation;
1785   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1786   set->excluded_generations
1787     = GNUNET_memdup (cr->source_set->excluded_generations,
1788                      set->excluded_generations_size * sizeof (struct GenerationRange));
1789
1790   /* Advance the generation of the new set, so that mutations to the
1791      of the cloned set and the source set are independent. */
1792   advance_generation (set);
1793   set->cs = cs;
1794   cs->set = set;
1795   GNUNET_free (cr);
1796   GNUNET_SERVICE_client_continue (cs->client);
1797 }
1798
1799
1800 /**
1801  * Handle a request from the client to cancel a running set operation.
1802  *
1803  * @param cls the client
1804  * @param msg the message
1805  */
1806 static void
1807 handle_client_cancel (void *cls,
1808                       const struct GNUNET_SET_CancelMessage *msg)
1809 {
1810   struct ClientState *cs = cls;
1811   struct Set *set;
1812   struct Operation *op;
1813   int found;
1814
1815   if (NULL == (set = cs->set))
1816   {
1817     /* client without a set requested an operation */
1818     GNUNET_break (0);
1819     GNUNET_SERVICE_client_drop (cs->client);
1820     return;
1821   }
1822   found = GNUNET_NO;
1823   for (op = set->ops_head; NULL != op; op = op->next)
1824   {
1825     if (op->client_request_id == ntohl (msg->request_id))
1826     {
1827       found = GNUNET_YES;
1828       break;
1829     }
1830   }
1831   if (GNUNET_NO == found)
1832   {
1833     /* It may happen that the operation was already destroyed due to
1834      * the other peer disconnecting.  The client may not know about this
1835      * yet and try to cancel the (just barely non-existent) operation.
1836      * So this is not a hard error.
1837      */
1838     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1839                 "Client canceled non-existent op %u\n",
1840                 (uint32_t) ntohl (msg->request_id));
1841   }
1842   else
1843   {
1844     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1845                 "Client requested cancel for op %u\n",
1846                 (uint32_t) ntohl (msg->request_id));
1847     _GSS_operation_destroy (op,
1848                             GNUNET_YES);
1849   }
1850   GNUNET_SERVICE_client_continue (cs->client);
1851 }
1852
1853
1854 /**
1855  * Handle a request from the client to accept a set operation that
1856  * came from a remote peer.  We forward the accept to the associated
1857  * operation for handling
1858  *
1859  * @param cls the client
1860  * @param msg the message
1861  */
1862 static void
1863 handle_client_accept (void *cls,
1864                       const struct GNUNET_SET_AcceptMessage *msg)
1865 {
1866   struct ClientState *cs = cls;
1867   struct Set *set;
1868   struct Operation *op;
1869   struct GNUNET_SET_ResultMessage *result_message;
1870   struct GNUNET_MQ_Envelope *ev;
1871   struct Listener *listener;
1872
1873   if (NULL == (set = cs->set))
1874   {
1875     /* client without a set requested to accept */
1876     GNUNET_break (0);
1877     GNUNET_SERVICE_client_drop (cs->client);
1878     return;
1879   }
1880   op = get_incoming (ntohl (msg->accept_reject_id));
1881   if (NULL == op)
1882   {
1883     /* It is not an error if the set op does not exist -- it may
1884      * have been destroyed when the partner peer disconnected. */
1885     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1886                 "Client %p accepted request %u of listener %p that is no longer active\n",
1887                 cs,
1888                 ntohl (msg->accept_reject_id),
1889                 cs->listener);
1890     ev = GNUNET_MQ_msg (result_message,
1891                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1892     result_message->request_id = msg->request_id;
1893     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1894     GNUNET_MQ_send (set->cs->mq,
1895                     ev);
1896     GNUNET_SERVICE_client_continue (cs->client);
1897     return;
1898   }
1899   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1900               "Client accepting request %u\n",
1901               (uint32_t) ntohl (msg->accept_reject_id));
1902   listener = op->listener;
1903   op->listener = NULL;
1904   GNUNET_CONTAINER_DLL_remove (listener->op_head,
1905                                listener->op_tail,
1906                                op);
1907   op->set = set;
1908   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1909                                set->ops_tail,
1910                                op);
1911   op->client_request_id = ntohl (msg->request_id);
1912   op->result_mode = ntohl (msg->result_mode);
1913   op->byzantine = msg->byzantine;
1914   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1915   op->force_full = msg->force_full;
1916   op->force_delta = msg->force_delta;
1917
1918   /* Advance generation values, so that future mutations do not
1919      interfer with the running operation. */
1920   op->generation_created = set->current_generation;
1921   advance_generation (set);
1922   GNUNET_assert (NULL == op->state);
1923   op->state = set->vt->accept (op);
1924   if (NULL == op->state)
1925   {
1926     GNUNET_break (0);
1927     GNUNET_SERVICE_client_drop (cs->client);
1928     return;
1929   }
1930   /* Now allow CADET to continue, as we did not do this in
1931      #handle_incoming_msg (as we wanted to first see if the
1932      local client would accept the request). */
1933   GNUNET_CADET_receive_done (op->channel);
1934   GNUNET_SERVICE_client_continue (cs->client);
1935 }
1936
1937
1938 /**
1939  * Called to clean up, after a shutdown has been requested.
1940  *
1941  * @param cls closure, NULL
1942  */
1943 static void
1944 shutdown_task (void *cls)
1945 {
1946   /* Delay actual shutdown to allow service to disconnect clients */
1947   in_shutdown = GNUNET_YES;
1948   if (0 == num_clients)
1949   {
1950     if (NULL != cadet)
1951     {
1952       GNUNET_CADET_disconnect (cadet);
1953       cadet = NULL;
1954     }
1955   }
1956   GNUNET_STATISTICS_destroy (_GSS_statistics,
1957                              GNUNET_YES);
1958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1959               "handled shutdown request\n");
1960 }
1961
1962
1963 /**
1964  * Function called by the service's run
1965  * method to run service-specific setup code.
1966  *
1967  * @param cls closure
1968  * @param cfg configuration to use
1969  * @param service the initialized service
1970  */
1971 static void
1972 run (void *cls,
1973      const struct GNUNET_CONFIGURATION_Handle *cfg,
1974      struct GNUNET_SERVICE_Handle *service)
1975 {
1976   /* FIXME: need to modify SERVICE (!) API to allow
1977      us to run a shutdown task *after* clients were
1978      forcefully disconnected! */
1979   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1980                                  NULL);
1981   _GSS_statistics = GNUNET_STATISTICS_create ("set",
1982                                               cfg);
1983   cadet = GNUNET_CADET_connect (cfg);
1984   if (NULL == cadet)
1985   {
1986     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1987                 _("Could not connect to CADET service\n"));
1988     GNUNET_SCHEDULER_shutdown ();
1989     return;
1990   }
1991 }
1992
1993
1994 /**
1995  * Define "main" method using service macro.
1996  */
1997 GNUNET_SERVICE_MAIN
1998 ("set",
1999  GNUNET_SERVICE_OPTION_NONE,
2000  &run,
2001  &client_connect_cb,
2002  &client_disconnect_cb,
2003  NULL,
2004  GNUNET_MQ_hd_fixed_size (client_accept,
2005                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2006                           struct GNUNET_SET_AcceptMessage,
2007                           NULL),
2008  GNUNET_MQ_hd_fixed_size (client_iter_ack,
2009                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2010                           struct GNUNET_SET_IterAckMessage,
2011                           NULL),
2012  GNUNET_MQ_hd_var_size (client_mutation,
2013                         GNUNET_MESSAGE_TYPE_SET_ADD,
2014                         struct GNUNET_SET_ElementMessage,
2015                         NULL),
2016  GNUNET_MQ_hd_fixed_size (client_create_set,
2017                           GNUNET_MESSAGE_TYPE_SET_CREATE,
2018                           struct GNUNET_SET_CreateMessage,
2019                           NULL),
2020  GNUNET_MQ_hd_fixed_size (client_iterate,
2021                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2022                           struct GNUNET_MessageHeader,
2023                           NULL),
2024  GNUNET_MQ_hd_var_size (client_evaluate,
2025                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2026                         struct GNUNET_SET_EvaluateMessage,
2027                         NULL),
2028  GNUNET_MQ_hd_fixed_size (client_listen,
2029                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
2030                           struct GNUNET_SET_ListenMessage,
2031                           NULL),
2032  GNUNET_MQ_hd_fixed_size (client_reject,
2033                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2034                           struct GNUNET_SET_RejectMessage,
2035                           NULL),
2036  GNUNET_MQ_hd_var_size (client_mutation,
2037                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2038                         struct GNUNET_SET_ElementMessage,
2039                         NULL),
2040  GNUNET_MQ_hd_fixed_size (client_cancel,
2041                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2042                           struct GNUNET_SET_CancelMessage,
2043                           NULL),
2044  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2045                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2046                           struct GNUNET_MessageHeader,
2047                           NULL),
2048  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2049                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2050                           struct GNUNET_SET_CopyLazyConnectMessage,
2051                           NULL),
2052  GNUNET_MQ_handler_end ());
2053
2054
2055 /* end of gnunet-service-set.c */