12af653c1b3f33377e4f3598db9d0824abcb39f4
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
31
32 /**
33  * How long do we hold on to an incoming channel if there is
34  * no local listener before giving up?
35  */
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38
39 /**
40  * Lazy copy requests made by a client.
41  */
42 struct LazyCopyRequest
43 {
44   /**
45    * Kept in a DLL.
46    */
47   struct LazyCopyRequest *prev;
48
49   /**
50    * Kept in a DLL.
51    */
52   struct LazyCopyRequest *next;
53
54   /**
55    * Which set are we supposed to copy?
56    */
57   struct Set *source_set;
58
59   /**
60    * Cookie identifying the request.
61    */
62   uint32_t cookie;
63
64 };
65
66
67 /**
68  * A listener is inhabited by a client, and waits for evaluation
69  * requests from remote peers.
70  */
71 struct Listener
72 {
73   /**
74    * Listeners are held in a doubly linked list.
75    */
76   struct Listener *next;
77
78   /**
79    * Listeners are held in a doubly linked list.
80    */
81   struct Listener *prev;
82
83   /**
84    * Head of DLL of operations this listener is responsible for.
85    * Once the client has accepted/declined the operation, the
86    * operation is moved to the respective set's operation DLLS.
87    */
88   struct Operation *op_head;
89
90   /**
91    * Tail of DLL of operations this listener is responsible for.
92    * Once the client has accepted/declined the operation, the
93    * operation is moved to the respective set's operation DLLS.
94    */
95   struct Operation *op_tail;
96
97   /**
98    * Client that owns the listener.
99    * Only one client may own a listener.
100    */
101   struct ClientState *cs;
102
103   /**
104    * The port we are listening on with CADET.
105    */
106   struct GNUNET_CADET_Port *open_port;
107
108   /**
109    * Application ID for the operation, used to distinguish
110    * multiple operations of the same type with the same peer.
111    */
112   struct GNUNET_HashCode app_id;
113
114   /**
115    * The type of the operation.
116    */
117   enum GNUNET_SET_OperationType operation;
118 };
119
120
121 /**
122  * Handle to the cadet service, used to listen for and connect to
123  * remote peers.
124  */
125 static struct GNUNET_CADET_Handle *cadet;
126
127 /**
128  * DLL of lazy copy requests by this client.
129  */
130 static struct LazyCopyRequest *lazy_copy_head;
131
132 /**
133  * DLL of lazy copy requests by this client.
134  */
135 static struct LazyCopyRequest *lazy_copy_tail;
136
137 /**
138  * Generator for unique cookie we set per lazy copy request.
139  */
140 static uint32_t lazy_copy_cookie;
141
142 /**
143  * Statistics handle.
144  */
145 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
146
147 /**
148  * Listeners are held in a doubly linked list.
149  */
150 static struct Listener *listener_head;
151
152 /**
153  * Listeners are held in a doubly linked list.
154  */
155 static struct Listener *listener_tail;
156
157 /**
158  * Counter for allocating unique IDs for clients, used to identify
159  * incoming operation requests from remote peers, that the client can
160  * choose to accept or refuse.  0 must not be used (reserved for
161  * uninitialized).
162  */
163 static uint32_t suggest_id;
164
165
166 /**
167  * Get the incoming socket associated with the given id.
168  *
169  * @param listener the listener to look in
170  * @param id id to look for
171  * @return the incoming socket associated with the id,
172  *         or NULL if there is none
173  */
174 static struct Operation *
175 get_incoming (uint32_t id)
176 {
177   for (struct Listener *listener = listener_head;
178        NULL != listener;
179        listener = listener->next)
180   {
181     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
182       if (op->suggest_id == id)
183         return op;
184   }
185   return NULL;
186 }
187
188
189 /**
190  * Destroy an incoming request from a remote peer
191  *
192  * @param op remote request to destroy
193  */
194 static void
195 incoming_destroy (struct Operation *op)
196 {
197   struct Listener *listener;
198   struct GNUNET_CADET_Channel *channel;
199
200   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
201               "Destroying incoming operation %p\n",
202               op);
203   if (NULL != (listener = op->listener))
204   {
205     GNUNET_CONTAINER_DLL_remove (listener->op_head,
206                                  listener->op_tail,
207                                  op);
208     op->listener = NULL;
209   }
210   if (NULL != op->timeout_task)
211   {
212     GNUNET_SCHEDULER_cancel (op->timeout_task);
213     op->timeout_task = NULL;
214   }
215   if (NULL != (channel = op->channel))
216   {
217     op->channel = NULL;
218     GNUNET_CADET_channel_destroy (channel);
219   }
220 }
221
222
223 /**
224  * Context for the #garbage_collect_cb().
225  */
226 struct GarbageContext
227 {
228
229   /**
230    * Map for which we are garbage collecting removed elements.
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *map;
233
234   /**
235    * Lowest generation for which an operation is still pending.
236    */
237   unsigned int min_op_generation;
238
239   /**
240    * Largest generation for which an operation is still pending.
241    */
242   unsigned int max_op_generation;
243
244 };
245
246
247 /**
248  * Function invoked to check if an element can be removed from
249  * the set's history because it is no longer needed.
250  *
251  * @param cls the `struct GarbageContext *`
252  * @param key key of the element in the map
253  * @param value the `struct ElementEntry *`
254  * @return #GNUNET_OK (continue to iterate)
255  */
256 static int
257 garbage_collect_cb (void *cls,
258                     const struct GNUNET_HashCode *key,
259                     void *value)
260 {
261   //struct GarbageContext *gc = cls;
262   //struct ElementEntry *ee = value;
263
264   //if (GNUNET_YES != ee->removed)
265   //  return GNUNET_OK;
266   //if ( (gc->max_op_generation < ee->generation_added) ||
267   //     (ee->generation_removed > gc->min_op_generation) )
268   //{
269   //  GNUNET_assert (GNUNET_YES ==
270   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
271   //                                                       key,
272   //                                                       ee));
273   //  GNUNET_free (ee);
274   //}
275   return GNUNET_OK;
276 }
277
278
279 /**
280  * Collect and destroy elements that are not needed anymore, because
281  * their lifetime (as determined by their generation) does not overlap
282  * with any active set operation.
283  *
284  * @param set set to garbage collect
285  */
286 static void
287 collect_generation_garbage (struct Set *set)
288 {
289   struct GarbageContext gc;
290
291   gc.min_op_generation = UINT_MAX;
292   gc.max_op_generation = 0;
293   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
294   {
295     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
296                                        op->generation_created);
297     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
298                                        op->generation_created);
299   }
300   gc.map = set->content->elements;
301   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
302                                          &garbage_collect_cb,
303                                          &gc);
304 }
305
306
307 /**
308  * Is @a generation in the range of exclusions?
309  *
310  * @param generation generation to query
311  * @param excluded array of generations where the element is excluded
312  * @param excluded_size length of the @a excluded array
313  * @return #GNUNET_YES if @a generation is in any of the ranges
314  */
315 static int
316 is_excluded_generation (unsigned int generation,
317                         struct GenerationRange *excluded,
318                         unsigned int excluded_size)
319 {
320   for (unsigned int i = 0; i < excluded_size; i++)
321     if ( (generation >= excluded[i].start) &&
322          (generation < excluded[i].end) )
323       return GNUNET_YES;
324   return GNUNET_NO;
325 }
326
327
328 /**
329  * Is element @a ee part of the set during @a query_generation?
330  *
331  * @param ee element to test
332  * @param query_generation generation to query
333  * @param excluded array of generations where the element is excluded
334  * @param excluded_size length of the @a excluded array
335  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
336  */
337 static int
338 is_element_of_generation (struct ElementEntry *ee,
339                           unsigned int query_generation,
340                           struct GenerationRange *excluded,
341                           unsigned int excluded_size)
342 {
343   struct MutationEvent *mut;
344   int is_present;
345
346   GNUNET_assert (NULL != ee->mutations);
347   if (GNUNET_YES ==
348       is_excluded_generation (query_generation,
349                               excluded,
350                               excluded_size))
351   {
352     GNUNET_break (0);
353     return GNUNET_NO;
354   }
355
356   is_present = GNUNET_NO;
357
358   /* Could be made faster with binary search, but lists
359      are small, so why bother. */
360   for (unsigned int i = 0; i < ee->mutations_size; i++)
361   {
362     mut = &ee->mutations[i];
363
364     if (mut->generation > query_generation)
365     {
366       /* The mutation doesn't apply to our generation
367          anymore.  We can'b break here, since mutations aren't
368          sorted by generation. */
369       continue;
370     }
371
372     if (GNUNET_YES ==
373         is_excluded_generation (mut->generation,
374                                 excluded,
375                                 excluded_size))
376     {
377       /* The generation is excluded (because it belongs to another
378          fork via a lazy copy) and thus mutations aren't considered
379          for membership testing. */
380       continue;
381     }
382
383     /* This would be an inconsistency in how we manage mutations. */
384     if ( (GNUNET_YES == is_present) &&
385          (GNUNET_YES == mut->added) )
386       GNUNET_assert (0);
387     /* Likewise. */
388     if ( (GNUNET_NO == is_present) &&
389          (GNUNET_NO == mut->added) )
390       GNUNET_assert (0);
391
392     is_present = mut->added;
393   }
394
395   return is_present;
396 }
397
398
399 /**
400  * Is element @a ee part of the set used by @a op?
401  *
402  * @param ee element to test
403  * @param op operation the defines the set and its generation
404  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
405  */
406 int
407 _GSS_is_element_of_operation (struct ElementEntry *ee,
408                               struct Operation *op)
409 {
410   return is_element_of_generation (ee,
411                                    op->generation_created,
412                                    op->set->excluded_generations,
413                                    op->set->excluded_generations_size);
414 }
415
416
417 /**
418  * Destroy the given operation.  Used for any operation where both
419  * peers were known and that thus actually had a vt and channel.  Must
420  * not be used for operations where 'listener' is still set and we do
421  * not know the other peer.
422  *
423  * Call the implementation-specific cancel function of the operation.
424  * Disconnects from the remote peer.  Does not disconnect the client,
425  * as there may be multiple operations per set.
426  *
427  * @param op operation to destroy
428  * @param gc #GNUNET_YES to perform garbage collection on the set
429  */
430 void
431 _GSS_operation_destroy (struct Operation *op,
432                         int gc)
433 {
434   struct Set *set = op->set;
435   struct GNUNET_CADET_Channel *channel;
436
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438               "Destroying operation %p\n",
439               op);
440   GNUNET_assert (NULL == op->listener);
441   if (NULL != op->state)
442   {
443     set->vt->cancel (op);
444     op->state = NULL;
445   }
446   if (NULL != set)
447   {
448     GNUNET_CONTAINER_DLL_remove (set->ops_head,
449                                  set->ops_tail,
450                                  op);
451     op->set = NULL;
452   }
453   if (NULL != op->context_msg)
454   {
455     GNUNET_free (op->context_msg);
456     op->context_msg = NULL;
457   }
458   if (NULL != (channel = op->channel))
459   {
460     /* This will free op; called conditionally as this helper function
461        is also called from within the channel disconnect handler. */
462     op->channel = NULL;
463     GNUNET_CADET_channel_destroy (channel);
464   }
465   if ( (NULL != set) &&
466        (GNUNET_YES == gc) )
467     collect_generation_garbage (set);
468   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
469    * there was a channel end handler that will free 'op' on the call stack. */
470 }
471
472
473 /**
474  * Callback called when a client connects to the service.
475  *
476  * @param cls closure for the service
477  * @param c the new client that connected to the service
478  * @param mq the message queue used to send messages to the client
479  * @return @a `struct ClientState`
480  */
481 static void *
482 client_connect_cb (void *cls,
483                    struct GNUNET_SERVICE_Client *c,
484                    struct GNUNET_MQ_Handle *mq)
485 {
486   struct ClientState *cs;
487
488   cs = GNUNET_new (struct ClientState);
489   cs->client = c;
490   cs->mq = mq;
491   return cs;
492 }
493
494
495 /**
496  * Iterator over hash map entries to free element entries.
497  *
498  * @param cls closure
499  * @param key current key code
500  * @param value a `struct ElementEntry *` to be free'd
501  * @return #GNUNET_YES (continue to iterate)
502  */
503 static int
504 destroy_elements_iterator (void *cls,
505                            const struct GNUNET_HashCode *key,
506                            void *value)
507 {
508   struct ElementEntry *ee = value;
509
510   GNUNET_free_non_null (ee->mutations);
511   GNUNET_free (ee);
512   return GNUNET_YES;
513 }
514
515
516 /**
517  * Clean up after a client has disconnected
518  *
519  * @param cls closure, unused
520  * @param client the client to clean up after
521  * @param internal_cls the `struct ClientState`
522  */
523 static void
524 client_disconnect_cb (void *cls,
525                       struct GNUNET_SERVICE_Client *client,
526                       void *internal_cls)
527 {
528   struct ClientState *cs = internal_cls;
529   struct Operation *op;
530   struct Listener *listener;
531   struct Set *set;
532
533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
534               "Client disconnected, cleaning up\n");
535   if (NULL != (set = cs->set))
536   {
537     struct SetContent *content = set->content;
538     struct PendingMutation *pm;
539     struct PendingMutation *pm_current;
540     struct LazyCopyRequest *lcr;
541
542     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543                 "Destroying client's set\n");
544     /* Destroy pending set operations */
545     while (NULL != set->ops_head)
546       _GSS_operation_destroy (set->ops_head,
547                               GNUNET_NO);
548
549     /* Destroy operation-specific state */
550     GNUNET_assert (NULL != set->state);
551     set->vt->destroy_set (set->state);
552     set->state = NULL;
553
554     /* Clean up ongoing iterations */
555     if (NULL != set->iter)
556     {
557       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
558       set->iter = NULL;
559       set->iteration_id++;
560     }
561
562     /* discard any pending mutations that reference this set */
563     pm = content->pending_mutations_head;
564     while (NULL != pm)
565     {
566       pm_current = pm;
567       pm = pm->next;
568       if (pm_current->set == set)
569       {
570         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
571                                      content->pending_mutations_tail,
572                                      pm_current);
573         GNUNET_free (pm_current);
574       }
575     }
576
577     /* free set content (or at least decrement RC) */
578     set->content = NULL;
579     GNUNET_assert (0 != content->refcount);
580     content->refcount--;
581     if (0 == content->refcount)
582     {
583       GNUNET_assert (NULL != content->elements);
584       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
585                                              &destroy_elements_iterator,
586                                              NULL);
587       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
588       content->elements = NULL;
589       GNUNET_free (content);
590     }
591     GNUNET_free_non_null (set->excluded_generations);
592     set->excluded_generations = NULL;
593
594     /* remove set from pending copy requests */
595     lcr = lazy_copy_head;
596     while (NULL != lcr)
597     {
598       struct LazyCopyRequest *lcr_current = lcr;
599
600       lcr = lcr->next;
601       if (lcr_current->source_set == set)
602       {
603         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
604                                      lazy_copy_tail,
605                                      lcr_current);
606         GNUNET_free (lcr_current);
607       }
608     }
609     GNUNET_free (set);
610   }
611
612   if (NULL != (listener = cs->listener))
613   {
614     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615                 "Destroying client's listener\n");
616     GNUNET_CADET_close_port (listener->open_port);
617     listener->open_port = NULL;
618     while (NULL != (op = listener->op_head))
619       incoming_destroy (op);
620     GNUNET_CONTAINER_DLL_remove (listener_head,
621                                  listener_tail,
622                                  listener);
623     GNUNET_free (listener);
624   }
625   GNUNET_free (cs);
626 }
627
628
629 /**
630  * Check a request for a set operation from another peer.
631  *
632  * @param cls the operation state
633  * @param msg the received message
634  * @return #GNUNET_OK if the channel should be kept alive,
635  *         #GNUNET_SYSERR to destroy the channel
636  */
637 static int
638 check_incoming_msg (void *cls,
639                     const struct OperationRequestMessage *msg)
640 {
641   struct Operation *op = cls;
642   struct Listener *listener = op->listener;
643   const struct GNUNET_MessageHeader *nested_context;
644
645   /* double operation request */
646   if (0 != op->suggest_id)
647   {
648     GNUNET_break_op (0);
649     return GNUNET_SYSERR;
650   }
651   /* This should be equivalent to the previous condition, but can't hurt to check twice */
652   if (NULL == op->listener)
653   {
654     GNUNET_break (0);
655     return GNUNET_SYSERR;
656   }
657   if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
658   {
659     GNUNET_break_op (0);
660     return GNUNET_SYSERR;
661   }
662   nested_context = GNUNET_MQ_extract_nested_mh (msg);
663   if ( (NULL != nested_context) &&
664        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
665   {
666     GNUNET_break_op (0);
667     return GNUNET_SYSERR;
668   }
669   return GNUNET_OK;
670 }
671
672
673 /**
674  * Handle a request for a set operation from another peer.  Checks if we
675  * have a listener waiting for such a request (and in that case initiates
676  * asking the listener about accepting the connection). If no listener
677  * is waiting, we queue the operation request in hope that a listener
678  * shows up soon (before timeout).
679  *
680  * This msg is expected as the first and only msg handled through the
681  * non-operation bound virtual table, acceptance of this operation replaces
682  * our virtual table and subsequent msgs would be routed differently (as
683  * we then know what type of operation this is).
684  *
685  * @param cls the operation state
686  * @param msg the received message
687  * @return #GNUNET_OK if the channel should be kept alive,
688  *         #GNUNET_SYSERR to destroy the channel
689  */
690 static void
691 handle_incoming_msg (void *cls,
692                      const struct OperationRequestMessage *msg)
693 {
694   struct Operation *op = cls;
695   struct Listener *listener = op->listener;
696   const struct GNUNET_MessageHeader *nested_context;
697   struct GNUNET_MQ_Envelope *env;
698   struct GNUNET_SET_RequestMessage *cmsg;
699
700   nested_context = GNUNET_MQ_extract_nested_mh (msg);
701   /* Make a copy of the nested_context (application-specific context
702      information that is opaque to set) so we can pass it to the
703      listener later on */
704   if (NULL != nested_context)
705     op->context_msg = GNUNET_copy_message (nested_context);
706   op->remote_element_count = ntohl (msg->element_count);
707   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
708               "Received P2P operation request (op %u, port %s) for active listener\n",
709               (uint32_t) ntohl (msg->operation),
710               GNUNET_h2s (&op->listener->app_id));
711   GNUNET_assert (0 == op->suggest_id);
712   if (0 == suggest_id)
713     suggest_id++;
714   op->suggest_id = suggest_id++;
715   GNUNET_assert (NULL != op->timeout_task);
716   GNUNET_SCHEDULER_cancel (op->timeout_task);
717   op->timeout_task = NULL;
718   env = GNUNET_MQ_msg_nested_mh (cmsg,
719                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
720                                  op->context_msg);
721   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722               "Suggesting incoming request with accept id %u to listener %p of client %p\n",
723               op->suggest_id,
724               listener,
725               listener->cs);
726   cmsg->accept_id = htonl (op->suggest_id);
727   cmsg->peer_id = op->peer;
728   GNUNET_MQ_send (listener->cs->mq,
729                   env);
730   /* NOTE: GNUNET_CADET_receive_done() will be called in
731      #handle_client_accept() */
732 }
733
734
735 /**
736  * Add an element to @a set as specified by @a msg
737  *
738  * @param set set to manipulate
739  * @param msg message specifying the change
740  */
741 static void
742 execute_add (struct Set *set,
743              const struct GNUNET_SET_ElementMessage *msg)
744 {
745   struct GNUNET_SET_Element el;
746   struct ElementEntry *ee;
747   struct GNUNET_HashCode hash;
748
749   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
750   el.size = ntohs (msg->header.size) - sizeof (*msg);
751   el.data = &msg[1];
752   el.element_type = ntohs (msg->element_type);
753   GNUNET_SET_element_hash (&el,
754                            &hash);
755   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
756                                           &hash);
757   if (NULL == ee)
758   {
759     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760                 "Client inserts element %s of size %u\n",
761                 GNUNET_h2s (&hash),
762                 el.size);
763     ee = GNUNET_malloc (el.size + sizeof (*ee));
764     ee->element.size = el.size;
765     GNUNET_memcpy (&ee[1],
766             el.data,
767             el.size);
768     ee->element.data = &ee[1];
769     ee->element.element_type = el.element_type;
770     ee->remote = GNUNET_NO;
771     ee->mutations = NULL;
772     ee->mutations_size = 0;
773     ee->element_hash = hash;
774     GNUNET_break (GNUNET_YES ==
775                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
776                                                      &ee->element_hash,
777                                                      ee,
778                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
779   }
780   else if (GNUNET_YES ==
781            is_element_of_generation (ee,
782                                      set->current_generation,
783                                      set->excluded_generations,
784                                      set->excluded_generations_size))
785   {
786     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787                 "Client inserted element %s of size %u twice (ignored)\n",
788                 GNUNET_h2s (&hash),
789                 el.size);
790
791     /* same element inserted twice */
792     return;
793   }
794
795   {
796     struct MutationEvent mut = {
797       .generation = set->current_generation,
798       .added = GNUNET_YES
799     };
800     GNUNET_array_append (ee->mutations,
801                          ee->mutations_size,
802                          mut);
803   }
804   set->vt->add (set->state,
805                 ee);
806 }
807
808
809 /**
810  * Remove an element from @a set as specified by @a msg
811  *
812  * @param set set to manipulate
813  * @param msg message specifying the change
814  */
815 static void
816 execute_remove (struct Set *set,
817                 const struct GNUNET_SET_ElementMessage *msg)
818 {
819   struct GNUNET_SET_Element el;
820   struct ElementEntry *ee;
821   struct GNUNET_HashCode hash;
822
823   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
824   el.size = ntohs (msg->header.size) - sizeof (*msg);
825   el.data = &msg[1];
826   el.element_type = ntohs (msg->element_type);
827   GNUNET_SET_element_hash (&el, &hash);
828   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
829                                           &hash);
830   if (NULL == ee)
831   {
832     /* Client tried to remove non-existing element. */
833     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834                 "Client removes non-existing element of size %u\n",
835                 el.size);
836     return;
837   }
838   if (GNUNET_NO ==
839       is_element_of_generation (ee,
840                                 set->current_generation,
841                                 set->excluded_generations,
842                                 set->excluded_generations_size))
843   {
844     /* Client tried to remove element twice */
845     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846                 "Client removed element of size %u twice (ignored)\n",
847                 el.size);
848     return;
849   }
850   else
851   {
852     struct MutationEvent mut = {
853       .generation = set->current_generation,
854       .added = GNUNET_NO
855     };
856
857     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858                 "Client removes element of size %u\n",
859                 el.size);
860
861     GNUNET_array_append (ee->mutations,
862                          ee->mutations_size,
863                          mut);
864   }
865   set->vt->remove (set->state,
866                    ee);
867 }
868
869
870 /**
871  * Perform a mutation on a set as specified by the @a msg
872  *
873  * @param set the set to mutate
874  * @param msg specification of what to change
875  */
876 static void
877 execute_mutation (struct Set *set,
878                   const struct GNUNET_SET_ElementMessage *msg)
879 {
880   switch (ntohs (msg->header.type))
881   {
882     case GNUNET_MESSAGE_TYPE_SET_ADD:
883       execute_add (set, msg);
884       break;
885     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
886       execute_remove (set, msg);
887       break;
888     default:
889       GNUNET_break (0);
890   }
891 }
892
893
894 /**
895  * Execute mutations that were delayed on a set because of
896  * pending operations.
897  *
898  * @param set the set to execute mutations on
899  */
900 static void
901 execute_delayed_mutations (struct Set *set)
902 {
903   struct PendingMutation *pm;
904
905   if (0 != set->content->iterator_count)
906     return; /* still cannot do this */
907   while (NULL != (pm = set->content->pending_mutations_head))
908   {
909     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
910                                  set->content->pending_mutations_tail,
911                                  pm);
912     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913                 "Executing pending mutation on %p.\n",
914                 pm->set);
915     execute_mutation (pm->set,
916                       pm->msg);
917     GNUNET_free (pm->msg);
918     GNUNET_free (pm);
919   }
920 }
921
922
923 /**
924  * Send the next element of a set to the set's client.  The next element is given by
925  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
926  * are no more elements in the set.  The caller must ensure that the set's iterator is
927  * valid.
928  *
929  * The client will acknowledge each received element with a
930  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
931  * #handle_client_iter_ack() will then trigger the next transmission.
932  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
933  *
934  * @param set set that should send its next element to its client
935  */
936 static void
937 send_client_element (struct Set *set)
938 {
939   int ret;
940   struct ElementEntry *ee;
941   struct GNUNET_MQ_Envelope *ev;
942   struct GNUNET_SET_IterResponseMessage *msg;
943
944   GNUNET_assert (NULL != set->iter);
945   do {
946     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
947                                                        NULL,
948                                                        (const void **) &ee);
949     if (GNUNET_NO == ret)
950     {
951       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952                   "Iteration on %p done.\n",
953                   set);
954       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
955       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
956       set->iter = NULL;
957       set->iteration_id++;
958       GNUNET_assert (set->content->iterator_count > 0);
959       set->content->iterator_count--;
960       execute_delayed_mutations (set);
961       GNUNET_MQ_send (set->cs->mq,
962                       ev);
963       return;
964     }
965     GNUNET_assert (NULL != ee);
966   } while (GNUNET_NO ==
967            is_element_of_generation (ee,
968                                      set->iter_generation,
969                                      set->excluded_generations,
970                                      set->excluded_generations_size));
971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972               "Sending iteration element on %p.\n",
973               set);
974   ev = GNUNET_MQ_msg_extra (msg,
975                             ee->element.size,
976                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
977   GNUNET_memcpy (&msg[1],
978                  ee->element.data,
979                  ee->element.size);
980   msg->element_type = htons (ee->element.element_type);
981   msg->iteration_id = htons (set->iteration_id);
982   GNUNET_MQ_send (set->cs->mq,
983                   ev);
984 }
985
986
987 /**
988  * Called when a client wants to iterate the elements of a set.
989  * Checks if we have a set associated with the client and if we
990  * can right now start an iteration. If all checks out, starts
991  * sending the elements of the set to the client.
992  *
993  * @param cls client that sent the message
994  * @param m message sent by the client
995  */
996 static void
997 handle_client_iterate (void *cls,
998                        const struct GNUNET_MessageHeader *m)
999 {
1000   struct ClientState *cs = cls;
1001   struct Set *set;
1002
1003   if (NULL == (set = cs->set))
1004   {
1005     /* attempt to iterate over a non existing set */
1006     GNUNET_break (0);
1007     GNUNET_SERVICE_client_drop (cs->client);
1008     return;
1009   }
1010   if (NULL != set->iter)
1011   {
1012     /* Only one concurrent iterate-action allowed per set */
1013     GNUNET_break (0);
1014     GNUNET_SERVICE_client_drop (cs->client);
1015     return;
1016   }
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018               "Iterating set %p in gen %u with %u content elements\n",
1019               (void *) set,
1020               set->current_generation,
1021               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1022   GNUNET_SERVICE_client_continue (cs->client);
1023   set->content->iterator_count++;
1024   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1025   set->iter_generation = set->current_generation;
1026   send_client_element (set);
1027 }
1028
1029
1030 /**
1031  * Called when a client wants to create a new set.  This is typically
1032  * the first request from a client, and includes the type of set
1033  * operation to be performed.
1034  *
1035  * @param cls client that sent the message
1036  * @param m message sent by the client
1037  */
1038 static void
1039 handle_client_create_set (void *cls,
1040                           const struct GNUNET_SET_CreateMessage *msg)
1041 {
1042   struct ClientState *cs = cls;
1043   struct Set *set;
1044
1045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046               "Client created new set (operation %u)\n",
1047               (uint32_t) ntohl (msg->operation));
1048   if (NULL != cs->set)
1049   {
1050     /* There can only be one set per client */
1051     GNUNET_break (0);
1052     GNUNET_SERVICE_client_drop (cs->client);
1053     return;
1054   }
1055   set = GNUNET_new (struct Set);
1056   switch (ntohl (msg->operation))
1057   {
1058   case GNUNET_SET_OPERATION_INTERSECTION:
1059     set->vt = _GSS_intersection_vt ();
1060     break;
1061   case GNUNET_SET_OPERATION_UNION:
1062     set->vt = _GSS_union_vt ();
1063     break;
1064   default:
1065     GNUNET_free (set);
1066     GNUNET_break (0);
1067     GNUNET_SERVICE_client_drop (cs->client);
1068     return;
1069   }
1070   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1071   set->state = set->vt->create ();
1072   if (NULL == set->state)
1073   {
1074     /* initialization failed (i.e. out of memory) */
1075     GNUNET_free (set);
1076     GNUNET_SERVICE_client_drop (cs->client);
1077     return;
1078   }
1079   set->content = GNUNET_new (struct SetContent);
1080   set->content->refcount = 1;
1081   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1082                                                                  GNUNET_YES);
1083   set->cs = cs;
1084   cs->set = set;
1085   GNUNET_SERVICE_client_continue (cs->client);
1086 }
1087
1088
1089 /**
1090  * Timeout happens iff:
1091  *  - we suggested an operation to our listener,
1092  *    but did not receive a response in time
1093  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1094  *
1095  * @param cls channel context
1096  * @param tc context information (why was this task triggered now)
1097  */
1098 static void
1099 incoming_timeout_cb (void *cls)
1100 {
1101   struct Operation *op = cls;
1102
1103   op->timeout_task = NULL;
1104   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1105               "Remote peer's incoming request timed out\n");
1106   incoming_destroy (op);
1107 }
1108
1109
1110 /**
1111  * Method called whenever another peer has added us to a channel the
1112  * other peer initiated.  Only called (once) upon reception of data
1113  * from a channel we listen on.
1114  *
1115  * The channel context represents the operation itself and gets added
1116  * to a DLL, from where it gets looked up when our local listener
1117  * client responds to a proposed/suggested operation or connects and
1118  * associates with this operation.
1119  *
1120  * @param cls closure
1121  * @param channel new handle to the channel
1122  * @param source peer that started the channel
1123  * @return initial channel context for the channel
1124  *         returns NULL on error
1125  */
1126 static void *
1127 channel_new_cb (void *cls,
1128                 struct GNUNET_CADET_Channel *channel,
1129                 const struct GNUNET_PeerIdentity *source)
1130 {
1131   struct Listener *listener = cls;
1132   struct Operation *op;
1133
1134   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135               "New incoming channel\n");
1136   op = GNUNET_new (struct Operation);
1137   op->listener = listener;
1138   op->peer = *source;
1139   op->channel = channel;
1140   op->mq = GNUNET_CADET_get_mq (op->channel);
1141   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1142                                        UINT32_MAX);
1143   op->timeout_task
1144     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1145                                     &incoming_timeout_cb,
1146                                     op);
1147   GNUNET_CONTAINER_DLL_insert (listener->op_head,
1148                                listener->op_tail,
1149                                op);
1150   return op;
1151 }
1152
1153
1154 /**
1155  * Function called whenever a channel is destroyed.  Should clean up
1156  * any associated state.  It must NOT call
1157  * GNUNET_CADET_channel_destroy() on the channel.
1158  *
1159  * The peer_disconnect function is part of a a virtual table set initially either
1160  * when a peer creates a new channel with us, or once we create
1161  * a new channel ourselves (evaluate).
1162  *
1163  * Once we know the exact type of operation (union/intersection), the vt is
1164  * replaced with an operation specific instance (_GSS_[op]_vt).
1165  *
1166  * @param channel_ctx place where local state associated
1167  *                   with the channel is stored
1168  * @param channel connection to the other end (henceforth invalid)
1169  */
1170 static void
1171 channel_end_cb (void *channel_ctx,
1172                 const struct GNUNET_CADET_Channel *channel)
1173 {
1174   struct Operation *op = channel_ctx;
1175
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177               "channel_end_cb called\n");
1178   op->channel = NULL;
1179   if (NULL != op->listener)
1180     incoming_destroy (op);
1181   else if (NULL != op->set)
1182     op->set->vt->channel_death (op);
1183   else
1184     _GSS_operation_destroy (op,
1185                             GNUNET_YES);
1186   GNUNET_free (op);
1187 }
1188
1189
1190 /**
1191  * Function called whenever an MQ-channel's transmission window size changes.
1192  *
1193  * The first callback in an outgoing channel will be with a non-zero value
1194  * and will mean the channel is connected to the destination.
1195  *
1196  * For an incoming channel it will be called immediately after the
1197  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1198  *
1199  * @param cls Channel closure.
1200  * @param channel Connection to the other end (henceforth invalid).
1201  * @param window_size New window size. If the is more messages than buffer size
1202  *                    this value will be negative..
1203  */
1204 static void
1205 channel_window_cb (void *cls,
1206                    const struct GNUNET_CADET_Channel *channel,
1207                    int window_size)
1208 {
1209   /* FIXME: not implemented, we could do flow control here... */
1210 }
1211
1212
1213 /**
1214  * Called when a client wants to create a new listener.
1215  *
1216  * @param cls client that sent the message
1217  * @param msg message sent by the client
1218  */
1219 static void
1220 handle_client_listen (void *cls,
1221                       const struct GNUNET_SET_ListenMessage *msg)
1222 {
1223   struct ClientState *cs = cls;
1224   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1225     GNUNET_MQ_hd_var_size (incoming_msg,
1226                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1227                            struct OperationRequestMessage,
1228                            NULL),
1229     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1230                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1231                            struct IBFMessage,
1232                            NULL),
1233     GNUNET_MQ_hd_var_size (union_p2p_elements,
1234                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1235                            struct GNUNET_SET_ElementMessage,
1236                            NULL),
1237     GNUNET_MQ_hd_var_size (union_p2p_offer,
1238                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1239                            struct GNUNET_MessageHeader,
1240                            NULL),
1241     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1242                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1243                            struct InquiryMessage,
1244                            NULL),
1245     GNUNET_MQ_hd_var_size (union_p2p_demand,
1246                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1247                            struct GNUNET_MessageHeader,
1248                            NULL),
1249     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1250                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1251                              struct GNUNET_MessageHeader,
1252                              NULL),
1253     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1254                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1255                              struct GNUNET_MessageHeader,
1256                              NULL),
1257     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1258                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1259                              struct GNUNET_MessageHeader,
1260                              NULL),
1261     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1262                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1263                            struct StrataEstimatorMessage,
1264                            NULL),
1265     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1266                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1267                            struct StrataEstimatorMessage,
1268                            NULL),
1269     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1270                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1271                            struct GNUNET_SET_ElementMessage,
1272                            NULL),
1273     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1274                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1275                              struct IntersectionElementInfoMessage,
1276                              NULL),
1277     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1278                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1279                            struct BFMessage,
1280                            NULL),
1281     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1282                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1283                              struct IntersectionDoneMessage,
1284                              NULL),
1285     GNUNET_MQ_handler_end ()
1286   };
1287   struct Listener *listener;
1288
1289   if (NULL != cs->listener)
1290   {
1291     /* max. one active listener per client! */
1292     GNUNET_break (0);
1293     GNUNET_SERVICE_client_drop (cs->client);
1294     return;
1295   }
1296   listener = GNUNET_new (struct Listener);
1297   listener->cs = cs;
1298   listener->app_id = msg->app_id;
1299   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1300   GNUNET_CONTAINER_DLL_insert (listener_head,
1301                                listener_tail,
1302                                listener);
1303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1304               "New listener created (op %u, port %s)\n",
1305               listener->operation,
1306               GNUNET_h2s (&listener->app_id));
1307   listener->open_port
1308     = GNUNET_CADET_open_port (cadet,
1309                               &msg->app_id,
1310                               &channel_new_cb,
1311                               listener,
1312                               &channel_window_cb,
1313                               &channel_end_cb,
1314                               cadet_handlers);
1315   GNUNET_SERVICE_client_continue (cs->client);
1316 }
1317
1318
1319 /**
1320  * Called when the listening client rejects an operation
1321  * request by another peer.
1322  *
1323  * @param cls client that sent the message
1324  * @param msg message sent by the client
1325  */
1326 static void
1327 handle_client_reject (void *cls,
1328                       const struct GNUNET_SET_RejectMessage *msg)
1329 {
1330   struct ClientState *cs = cls;
1331   struct Operation *op;
1332
1333   op = get_incoming (ntohl (msg->accept_reject_id));
1334   if (NULL == op)
1335   {
1336     /* no matching incoming operation for this reject;
1337        could be that the other peer already disconnected... */
1338     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1339                 "Client rejected unknown operation %u\n",
1340                 (unsigned int) ntohl (msg->accept_reject_id));
1341     GNUNET_SERVICE_client_continue (cs->client);
1342     return;
1343   }
1344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1345               "Peer request (op %u, app %s) rejected by client\n",
1346               op->listener->operation,
1347               GNUNET_h2s (&cs->listener->app_id));
1348   GNUNET_CADET_channel_destroy (op->channel);
1349   GNUNET_SERVICE_client_continue (cs->client);
1350 }
1351
1352
1353 /**
1354  * Called when a client wants to add or remove an element to a set it inhabits.
1355  *
1356  * @param cls client that sent the message
1357  * @param msg message sent by the client
1358  */
1359 static int
1360 check_client_mutation (void *cls,
1361                        const struct GNUNET_SET_ElementMessage *msg)
1362 {
1363   /* NOTE: Technically, we should probably check with the
1364      block library whether the element we are given is well-formed */
1365   return GNUNET_OK;
1366 }
1367
1368
1369 /**
1370  * Called when a client wants to add or remove an element to a set it inhabits.
1371  *
1372  * @param cls client that sent the message
1373  * @param msg message sent by the client
1374  */
1375 static void
1376 handle_client_mutation (void *cls,
1377                         const struct GNUNET_SET_ElementMessage *msg)
1378 {
1379   struct ClientState *cs = cls;
1380   struct Set *set;
1381
1382   if (NULL == (set = cs->set))
1383   {
1384     /* client without a set requested an operation */
1385     GNUNET_break (0);
1386     GNUNET_SERVICE_client_drop (cs->client);
1387     return;
1388   }
1389   GNUNET_SERVICE_client_continue (cs->client);
1390
1391   if (0 != set->content->iterator_count)
1392   {
1393     struct PendingMutation *pm;
1394
1395     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1396                 "Scheduling mutation on set\n");
1397     pm = GNUNET_new (struct PendingMutation);
1398     pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1399     pm->set = set;
1400     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1401                                       set->content->pending_mutations_tail,
1402                                       pm);
1403     return;
1404   }
1405   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406               "Executing mutation on set\n");
1407   execute_mutation (set,
1408                     msg);
1409 }
1410
1411
1412 /**
1413  * Advance the current generation of a set,
1414  * adding exclusion ranges if necessary.
1415  *
1416  * @param set the set where we want to advance the generation
1417  */
1418 static void
1419 advance_generation (struct Set *set)
1420 {
1421   struct GenerationRange r;
1422
1423   if (set->current_generation == set->content->latest_generation)
1424   {
1425     set->content->latest_generation++;
1426     set->current_generation++;
1427     return;
1428   }
1429
1430   GNUNET_assert (set->current_generation < set->content->latest_generation);
1431
1432   r.start = set->current_generation + 1;
1433   r.end = set->content->latest_generation + 1;
1434   set->content->latest_generation = r.end;
1435   set->current_generation = r.end;
1436   GNUNET_array_append (set->excluded_generations,
1437                        set->excluded_generations_size,
1438                        r);
1439 }
1440
1441
1442 /**
1443  * Called when a client wants to initiate a set operation with another
1444  * peer.  Initiates the CADET connection to the listener and sends the
1445  * request.
1446  *
1447  * @param cls client that sent the message
1448  * @param msg message sent by the client
1449  * @return #GNUNET_OK if the message is well-formed
1450  */
1451 static int
1452 check_client_evaluate (void *cls,
1453                         const struct GNUNET_SET_EvaluateMessage *msg)
1454 {
1455   /* FIXME: suboptimal, even if the context below could be NULL,
1456      there are malformed messages this does not check for... */
1457   return GNUNET_OK;
1458 }
1459
1460
1461 /**
1462  * Called when a client wants to initiate a set operation with another
1463  * peer.  Initiates the CADET connection to the listener and sends the
1464  * request.
1465  *
1466  * @param cls client that sent the message
1467  * @param msg message sent by the client
1468  */
1469 static void
1470 handle_client_evaluate (void *cls,
1471                         const struct GNUNET_SET_EvaluateMessage *msg)
1472 {
1473   struct ClientState *cs = cls;
1474   struct Operation *op = GNUNET_new (struct Operation);
1475   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1476     GNUNET_MQ_hd_var_size (incoming_msg,
1477                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1478                            struct OperationRequestMessage,
1479                            op),
1480     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1481                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1482                            struct IBFMessage,
1483                            op),
1484     GNUNET_MQ_hd_var_size (union_p2p_elements,
1485                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1486                            struct GNUNET_SET_ElementMessage,
1487                            op),
1488     GNUNET_MQ_hd_var_size (union_p2p_offer,
1489                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1490                            struct GNUNET_MessageHeader,
1491                            op),
1492     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1493                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1494                            struct InquiryMessage,
1495                            op),
1496     GNUNET_MQ_hd_var_size (union_p2p_demand,
1497                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1498                            struct GNUNET_MessageHeader,
1499                            op),
1500     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1501                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1502                              struct GNUNET_MessageHeader,
1503                              op),
1504     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1505                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1506                              struct GNUNET_MessageHeader,
1507                              op),
1508     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1509                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1510                              struct GNUNET_MessageHeader,
1511                              op),
1512     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1513                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1514                            struct StrataEstimatorMessage,
1515                            op),
1516     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1517                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1518                            struct StrataEstimatorMessage,
1519                            op),
1520     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1521                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1522                            struct GNUNET_SET_ElementMessage,
1523                            op),
1524     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1525                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1526                              struct IntersectionElementInfoMessage,
1527                              op),
1528     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1529                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1530                            struct BFMessage,
1531                            op),
1532     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1533                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1534                              struct IntersectionDoneMessage,
1535                              op),
1536     GNUNET_MQ_handler_end ()
1537   };
1538   struct Set *set;
1539   const struct GNUNET_MessageHeader *context;
1540
1541   if (NULL == (set = cs->set))
1542   {
1543     GNUNET_break (0);
1544     GNUNET_free (op);
1545     GNUNET_SERVICE_client_drop (cs->client);
1546     return;
1547   }
1548   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1549                                        UINT32_MAX);
1550   op->peer = msg->target_peer;
1551   op->result_mode = ntohl (msg->result_mode);
1552   op->client_request_id = ntohl (msg->request_id);
1553   op->byzantine = msg->byzantine;
1554   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1555   op->force_full = msg->force_full;
1556   op->force_delta = msg->force_delta;
1557   context = GNUNET_MQ_extract_nested_mh (msg);
1558
1559   /* Advance generation values, so that
1560      mutations won't interfer with the running operation. */
1561   op->set = set;
1562   op->generation_created = set->current_generation;
1563   advance_generation (set);
1564   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1565                                set->ops_tail,
1566                                op);
1567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1568               "Creating new CADET channel to port %s for set operation type %u\n",
1569               GNUNET_h2s (&msg->app_id),
1570               set->operation);
1571   op->channel = GNUNET_CADET_channel_create (cadet,
1572                                              op,
1573                                              &msg->target_peer,
1574                                              &msg->app_id,
1575                                              GNUNET_CADET_OPTION_RELIABLE,
1576                                              &channel_window_cb,
1577                                              &channel_end_cb,
1578                                              cadet_handlers);
1579   op->mq = GNUNET_CADET_get_mq (op->channel);
1580   op->state = set->vt->evaluate (op,
1581                                  context);
1582   if (NULL == op->state)
1583   {
1584     GNUNET_break (0);
1585     GNUNET_SERVICE_client_drop (cs->client);
1586     return;
1587   }
1588   GNUNET_SERVICE_client_continue (cs->client);
1589 }
1590
1591
1592 /**
1593  * Handle an ack from a client, and send the next element. Note
1594  * that we only expect acks for set elements, not after the
1595  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1596  *
1597  * @param cls client the client
1598  * @param ack the message
1599  */
1600 static void
1601 handle_client_iter_ack (void *cls,
1602                         const struct GNUNET_SET_IterAckMessage *ack)
1603 {
1604   struct ClientState *cs = cls;
1605   struct Set *set;
1606
1607   if (NULL == (set = cs->set))
1608   {
1609     /* client without a set acknowledged receiving a value */
1610     GNUNET_break (0);
1611     GNUNET_SERVICE_client_drop (cs->client);
1612     return;
1613   }
1614   if (NULL == set->iter)
1615   {
1616     /* client sent an ack, but we were not expecting one (as
1617        set iteration has finished) */
1618     GNUNET_break (0);
1619     GNUNET_SERVICE_client_drop (cs->client);
1620     return;
1621   }
1622   GNUNET_SERVICE_client_continue (cs->client);
1623   if (ntohl (ack->send_more))
1624   {
1625     send_client_element (set);
1626   }
1627   else
1628   {
1629     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1630     set->iter = NULL;
1631     set->iteration_id++;
1632   }
1633 }
1634
1635
1636 /**
1637  * Handle a request from the client to copy a set.
1638  *
1639  * @param cls the client
1640  * @param mh the message
1641  */
1642 static void
1643 handle_client_copy_lazy_prepare (void *cls,
1644                                  const struct GNUNET_MessageHeader *mh)
1645 {
1646   struct ClientState *cs = cls;
1647   struct Set *set;
1648   struct LazyCopyRequest *cr;
1649   struct GNUNET_MQ_Envelope *ev;
1650   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1651
1652   if (NULL == (set = cs->set))
1653   {
1654     /* client without a set requested an operation */
1655     GNUNET_break (0);
1656     GNUNET_SERVICE_client_drop (cs->client);
1657     return;
1658   }
1659   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660               "Client requested creation of lazy copy\n");
1661   cr = GNUNET_new (struct LazyCopyRequest);
1662   cr->cookie = ++lazy_copy_cookie;
1663   cr->source_set = set;
1664   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1665                                lazy_copy_tail,
1666                                cr);
1667   ev = GNUNET_MQ_msg (resp_msg,
1668                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1669   resp_msg->cookie = cr->cookie;
1670   GNUNET_MQ_send (set->cs->mq,
1671                   ev);
1672   GNUNET_SERVICE_client_continue (cs->client);
1673 }
1674
1675
1676 /**
1677  * Handle a request from the client to connect to a copy of a set.
1678  *
1679  * @param cls the client
1680  * @param msg the message
1681  */
1682 static void
1683 handle_client_copy_lazy_connect (void *cls,
1684                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1685 {
1686   struct ClientState *cs = cls;
1687   struct LazyCopyRequest *cr;
1688   struct Set *set;
1689   int found;
1690
1691   if (NULL != cs->set)
1692   {
1693     /* There can only be one set per client */
1694     GNUNET_break (0);
1695     GNUNET_SERVICE_client_drop (cs->client);
1696     return;
1697   }
1698   found = GNUNET_NO;
1699   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1700   {
1701     if (cr->cookie == msg->cookie)
1702     {
1703       found = GNUNET_YES;
1704       break;
1705     }
1706   }
1707   if (GNUNET_NO == found)
1708   {
1709     /* client asked for copy with cookie we don't know */
1710     GNUNET_break (0);
1711     GNUNET_SERVICE_client_drop (cs->client);
1712     return;
1713   }
1714   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1715                                lazy_copy_tail,
1716                                cr);
1717   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718               "Client %p requested use of lazy copy\n",
1719               cs);
1720   set = GNUNET_new (struct Set);
1721   switch (cr->source_set->operation)
1722   {
1723   case GNUNET_SET_OPERATION_INTERSECTION:
1724     set->vt = _GSS_intersection_vt ();
1725     break;
1726   case GNUNET_SET_OPERATION_UNION:
1727     set->vt = _GSS_union_vt ();
1728     break;
1729   default:
1730     GNUNET_assert (0);
1731     return;
1732   }
1733
1734   if (NULL == set->vt->copy_state)
1735   {
1736     /* Lazy copy not supported for this set operation */
1737     GNUNET_break (0);
1738     GNUNET_free (set);
1739     GNUNET_free (cr);
1740     GNUNET_SERVICE_client_drop (cs->client);
1741     return;
1742   }
1743
1744   set->operation = cr->source_set->operation;
1745   set->state = set->vt->copy_state (cr->source_set->state);
1746   set->content = cr->source_set->content;
1747   set->content->refcount++;
1748
1749   set->current_generation = cr->source_set->current_generation;
1750   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1751   set->excluded_generations
1752     = GNUNET_memdup (cr->source_set->excluded_generations,
1753                      set->excluded_generations_size * sizeof (struct GenerationRange));
1754
1755   /* Advance the generation of the new set, so that mutations to the
1756      of the cloned set and the source set are independent. */
1757   advance_generation (set);
1758   set->cs = cs;
1759   cs->set = set;
1760   GNUNET_free (cr);
1761   GNUNET_SERVICE_client_continue (cs->client);
1762 }
1763
1764
1765 /**
1766  * Handle a request from the client to cancel a running set operation.
1767  *
1768  * @param cls the client
1769  * @param msg the message
1770  */
1771 static void
1772 handle_client_cancel (void *cls,
1773                       const struct GNUNET_SET_CancelMessage *msg)
1774 {
1775   struct ClientState *cs = cls;
1776   struct Set *set;
1777   struct Operation *op;
1778   int found;
1779
1780   if (NULL == (set = cs->set))
1781   {
1782     /* client without a set requested an operation */
1783     GNUNET_break (0);
1784     GNUNET_SERVICE_client_drop (cs->client);
1785     return;
1786   }
1787   found = GNUNET_NO;
1788   for (op = set->ops_head; NULL != op; op = op->next)
1789   {
1790     if (op->client_request_id == ntohl (msg->request_id))
1791     {
1792       found = GNUNET_YES;
1793       break;
1794     }
1795   }
1796   if (GNUNET_NO == found)
1797   {
1798     /* It may happen that the operation was already destroyed due to
1799      * the other peer disconnecting.  The client may not know about this
1800      * yet and try to cancel the (just barely non-existent) operation.
1801      * So this is not a hard error.
1802      */
1803     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1804                 "Client canceled non-existent op %u\n",
1805                 (uint32_t) ntohl (msg->request_id));
1806   }
1807   else
1808   {
1809     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1810                 "Client requested cancel for op %u\n",
1811                 (uint32_t) ntohl (msg->request_id));
1812     _GSS_operation_destroy (op,
1813                             GNUNET_YES);
1814   }
1815   GNUNET_SERVICE_client_continue (cs->client);
1816 }
1817
1818
1819 /**
1820  * Handle a request from the client to accept a set operation that
1821  * came from a remote peer.  We forward the accept to the associated
1822  * operation for handling
1823  *
1824  * @param cls the client
1825  * @param msg the message
1826  */
1827 static void
1828 handle_client_accept (void *cls,
1829                       const struct GNUNET_SET_AcceptMessage *msg)
1830 {
1831   struct ClientState *cs = cls;
1832   struct Set *set;
1833   struct Operation *op;
1834   struct GNUNET_SET_ResultMessage *result_message;
1835   struct GNUNET_MQ_Envelope *ev;
1836   struct Listener *listener;
1837
1838   if (NULL == (set = cs->set))
1839   {
1840     /* client without a set requested to accept */
1841     GNUNET_break (0);
1842     GNUNET_SERVICE_client_drop (cs->client);
1843     return;
1844   }
1845   op = get_incoming (ntohl (msg->accept_reject_id));
1846   if (NULL == op)
1847   {
1848     /* It is not an error if the set op does not exist -- it may
1849      * have been destroyed when the partner peer disconnected. */
1850     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1851                 "Client %p accepted request %u of listener %p that is no longer active\n",
1852                 cs,
1853                 ntohl (msg->accept_reject_id),
1854                 cs->listener);
1855     ev = GNUNET_MQ_msg (result_message,
1856                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1857     result_message->request_id = msg->request_id;
1858     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1859     GNUNET_MQ_send (set->cs->mq,
1860                     ev);
1861     GNUNET_SERVICE_client_continue (cs->client);
1862     return;
1863   }
1864   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1865               "Client accepting request %u\n",
1866               (uint32_t) ntohl (msg->accept_reject_id));
1867   listener = op->listener;
1868   op->listener = NULL;
1869   GNUNET_CONTAINER_DLL_remove (listener->op_head,
1870                                listener->op_tail,
1871                                op);
1872   op->set = set;
1873   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1874                                set->ops_tail,
1875                                op);
1876   op->client_request_id = ntohl (msg->request_id);
1877   op->result_mode = ntohl (msg->result_mode);
1878   op->byzantine = msg->byzantine;
1879   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1880   op->force_full = msg->force_full;
1881   op->force_delta = msg->force_delta;
1882
1883   /* Advance generation values, so that future mutations do not
1884      interfer with the running operation. */
1885   op->generation_created = set->current_generation;
1886   advance_generation (set);
1887   GNUNET_assert (NULL == op->state);
1888   op->state = set->vt->accept (op);
1889   if (NULL == op->state)
1890   {
1891     GNUNET_break (0);
1892     GNUNET_SERVICE_client_drop (cs->client);
1893     return;
1894   }
1895   /* Now allow CADET to continue, as we did not do this in
1896      #handle_incoming_msg (as we wanted to first see if the
1897      local client would accept the request). */
1898   GNUNET_CADET_receive_done (op->channel);
1899   GNUNET_SERVICE_client_continue (cs->client);
1900 }
1901
1902
1903 /**
1904  * Called to clean up, after a shutdown has been requested.
1905  *
1906  * @param cls closure, NULL
1907  */
1908 static void
1909 shutdown_task (void *cls)
1910 {
1911   /* Delay actual shutdown to allow service to disconnect clients */
1912   if (NULL != cadet)
1913   {
1914     GNUNET_CADET_disconnect (cadet);
1915     cadet = NULL;
1916   }
1917   GNUNET_STATISTICS_destroy (_GSS_statistics,
1918                              GNUNET_YES);
1919   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1920               "handled shutdown request\n");
1921 }
1922
1923
1924 /**
1925  * Function called by the service's run
1926  * method to run service-specific setup code.
1927  *
1928  * @param cls closure
1929  * @param cfg configuration to use
1930  * @param service the initialized service
1931  */
1932 static void
1933 run (void *cls,
1934      const struct GNUNET_CONFIGURATION_Handle *cfg,
1935      struct GNUNET_SERVICE_Handle *service)
1936 {
1937   /* FIXME: need to modify SERVICE (!) API to allow
1938      us to run a shutdown task *after* clients were
1939      forcefully disconnected! */
1940   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1941                                  NULL);
1942   _GSS_statistics = GNUNET_STATISTICS_create ("set",
1943                                               cfg);
1944   cadet = GNUNET_CADET_connect (cfg);
1945   if (NULL == cadet)
1946   {
1947     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1948                 _("Could not connect to CADET service\n"));
1949     GNUNET_SCHEDULER_shutdown ();
1950     return;
1951   }
1952 }
1953
1954
1955 /**
1956  * Define "main" method using service macro.
1957  */
1958 GNUNET_SERVICE_MAIN
1959 ("set",
1960  GNUNET_SERVICE_OPTION_NONE,
1961  &run,
1962  &client_connect_cb,
1963  &client_disconnect_cb,
1964  NULL,
1965  GNUNET_MQ_hd_fixed_size (client_accept,
1966                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1967                           struct GNUNET_SET_AcceptMessage,
1968                           NULL),
1969  GNUNET_MQ_hd_fixed_size (client_iter_ack,
1970                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1971                           struct GNUNET_SET_IterAckMessage,
1972                           NULL),
1973  GNUNET_MQ_hd_var_size (client_mutation,
1974                         GNUNET_MESSAGE_TYPE_SET_ADD,
1975                         struct GNUNET_SET_ElementMessage,
1976                         NULL),
1977  GNUNET_MQ_hd_fixed_size (client_create_set,
1978                           GNUNET_MESSAGE_TYPE_SET_CREATE,
1979                           struct GNUNET_SET_CreateMessage,
1980                           NULL),
1981  GNUNET_MQ_hd_fixed_size (client_iterate,
1982                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1983                           struct GNUNET_MessageHeader,
1984                           NULL),
1985  GNUNET_MQ_hd_var_size (client_evaluate,
1986                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1987                         struct GNUNET_SET_EvaluateMessage,
1988                         NULL),
1989  GNUNET_MQ_hd_fixed_size (client_listen,
1990                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
1991                           struct GNUNET_SET_ListenMessage,
1992                           NULL),
1993  GNUNET_MQ_hd_fixed_size (client_reject,
1994                           GNUNET_MESSAGE_TYPE_SET_REJECT,
1995                           struct GNUNET_SET_RejectMessage,
1996                           NULL),
1997  GNUNET_MQ_hd_var_size (client_mutation,
1998                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
1999                         struct GNUNET_SET_ElementMessage,
2000                         NULL),
2001  GNUNET_MQ_hd_fixed_size (client_cancel,
2002                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2003                           struct GNUNET_SET_CancelMessage,
2004                           NULL),
2005  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2006                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2007                           struct GNUNET_MessageHeader,
2008                           NULL),
2009  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2010                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2011                           struct GNUNET_SET_CopyLazyConnectMessage,
2012                           NULL),
2013  GNUNET_MQ_handler_end ());
2014
2015
2016 /* end of gnunet-service-set.c */