f98d43a7de541fed04c42f0d233dffb8b6fc9153
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
31
32 /**
33  * How long do we hold on to an incoming channel if there is
34  * no local listener before giving up?
35  */
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38
39 /**
40  * Lazy copy requests made by a client.
41  */
42 struct LazyCopyRequest
43 {
44   /**
45    * Kept in a DLL.
46    */
47   struct LazyCopyRequest *prev;
48
49   /**
50    * Kept in a DLL.
51    */
52   struct LazyCopyRequest *next;
53
54   /**
55    * Which set are we supposed to copy?
56    */
57   struct Set *source_set;
58
59   /**
60    * Cookie identifying the request.
61    */
62   uint32_t cookie;
63
64 };
65
66
67 /**
68  * A listener is inhabited by a client, and waits for evaluation
69  * requests from remote peers.
70  */
71 struct Listener
72 {
73   /**
74    * Listeners are held in a doubly linked list.
75    */
76   struct Listener *next;
77
78   /**
79    * Listeners are held in a doubly linked list.
80    */
81   struct Listener *prev;
82
83   /**
84    * Head of DLL of operations this listener is responsible for.
85    * Once the client has accepted/declined the operation, the
86    * operation is moved to the respective set's operation DLLS.
87    */
88   struct Operation *op_head;
89
90   /**
91    * Tail of DLL of operations this listener is responsible for.
92    * Once the client has accepted/declined the operation, the
93    * operation is moved to the respective set's operation DLLS.
94    */
95   struct Operation *op_tail;
96
97   /**
98    * Client that owns the listener.
99    * Only one client may own a listener.
100    */
101   struct ClientState *cs;
102
103   /**
104    * The port we are listening on with CADET.
105    */
106   struct GNUNET_CADET_Port *open_port;
107
108   /**
109    * Application ID for the operation, used to distinguish
110    * multiple operations of the same type with the same peer.
111    */
112   struct GNUNET_HashCode app_id;
113
114   /**
115    * The type of the operation.
116    */
117   enum GNUNET_SET_OperationType operation;
118 };
119
120
121 /**
122  * Handle to the cadet service, used to listen for and connect to
123  * remote peers.
124  */
125 static struct GNUNET_CADET_Handle *cadet;
126
127 /**
128  * DLL of lazy copy requests by this client.
129  */
130 static struct LazyCopyRequest *lazy_copy_head;
131
132 /**
133  * DLL of lazy copy requests by this client.
134  */
135 static struct LazyCopyRequest *lazy_copy_tail;
136
137 /**
138  * Generator for unique cookie we set per lazy copy request.
139  */
140 static uint32_t lazy_copy_cookie;
141
142 /**
143  * Statistics handle.
144  */
145 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
146
147 /**
148  * Listeners are held in a doubly linked list.
149  */
150 static struct Listener *listener_head;
151
152 /**
153  * Listeners are held in a doubly linked list.
154  */
155 static struct Listener *listener_tail;
156
157 /**
158  * Counter for allocating unique IDs for clients, used to identify
159  * incoming operation requests from remote peers, that the client can
160  * choose to accept or refuse.  0 must not be used (reserved for
161  * uninitialized).
162  */
163 static uint32_t suggest_id;
164
165
166 /**
167  * Get the incoming socket associated with the given id.
168  *
169  * @param listener the listener to look in
170  * @param id id to look for
171  * @return the incoming socket associated with the id,
172  *         or NULL if there is none
173  */
174 static struct Operation *
175 get_incoming (uint32_t id)
176 {
177   for (struct Listener *listener = listener_head;
178        NULL != listener;
179        listener = listener->next)
180   {
181     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
182       if (op->suggest_id == id)
183         return op;
184   }
185   return NULL;
186 }
187
188
189 /**
190  * Destroy an incoming request from a remote peer
191  *
192  * @param op remote request to destroy
193  */
194 static void
195 incoming_destroy (struct Operation *op)
196 {
197   struct Listener *listener;
198   struct GNUNET_CADET_Channel *channel;
199
200   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
201               "Destroying incoming operation %p\n",
202               op);
203   if (NULL != (listener = op->listener))
204   {
205     GNUNET_CONTAINER_DLL_remove (listener->op_head,
206                                  listener->op_tail,
207                                  op);
208     op->listener = NULL;
209   }
210   if (NULL != op->timeout_task)
211   {
212     GNUNET_SCHEDULER_cancel (op->timeout_task);
213     op->timeout_task = NULL;
214   }
215   if (NULL != (channel = op->channel))
216   {
217     op->channel = NULL;
218     GNUNET_CADET_channel_destroy (channel);
219   }
220 }
221
222
223 /**
224  * Context for the #garbage_collect_cb().
225  */
226 struct GarbageContext
227 {
228
229   /**
230    * Map for which we are garbage collecting removed elements.
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *map;
233
234   /**
235    * Lowest generation for which an operation is still pending.
236    */
237   unsigned int min_op_generation;
238
239   /**
240    * Largest generation for which an operation is still pending.
241    */
242   unsigned int max_op_generation;
243
244 };
245
246
247 /**
248  * Function invoked to check if an element can be removed from
249  * the set's history because it is no longer needed.
250  *
251  * @param cls the `struct GarbageContext *`
252  * @param key key of the element in the map
253  * @param value the `struct ElementEntry *`
254  * @return #GNUNET_OK (continue to iterate)
255  */
256 static int
257 garbage_collect_cb (void *cls,
258                     const struct GNUNET_HashCode *key,
259                     void *value)
260 {
261   //struct GarbageContext *gc = cls;
262   //struct ElementEntry *ee = value;
263
264   //if (GNUNET_YES != ee->removed)
265   //  return GNUNET_OK;
266   //if ( (gc->max_op_generation < ee->generation_added) ||
267   //     (ee->generation_removed > gc->min_op_generation) )
268   //{
269   //  GNUNET_assert (GNUNET_YES ==
270   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
271   //                                                       key,
272   //                                                       ee));
273   //  GNUNET_free (ee);
274   //}
275   return GNUNET_OK;
276 }
277
278
279 /**
280  * Collect and destroy elements that are not needed anymore, because
281  * their lifetime (as determined by their generation) does not overlap
282  * with any active set operation.
283  *
284  * @param set set to garbage collect
285  */
286 static void
287 collect_generation_garbage (struct Set *set)
288 {
289   struct GarbageContext gc;
290
291   gc.min_op_generation = UINT_MAX;
292   gc.max_op_generation = 0;
293   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
294   {
295     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
296                                        op->generation_created);
297     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
298                                        op->generation_created);
299   }
300   gc.map = set->content->elements;
301   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
302                                          &garbage_collect_cb,
303                                          &gc);
304 }
305
306
307 /**
308  * Is @a generation in the range of exclusions?
309  *
310  * @param generation generation to query
311  * @param excluded array of generations where the element is excluded
312  * @param excluded_size length of the @a excluded array
313  * @return #GNUNET_YES if @a generation is in any of the ranges
314  */
315 static int
316 is_excluded_generation (unsigned int generation,
317                         struct GenerationRange *excluded,
318                         unsigned int excluded_size)
319 {
320   for (unsigned int i = 0; i < excluded_size; i++)
321     if ( (generation >= excluded[i].start) &&
322          (generation < excluded[i].end) )
323       return GNUNET_YES;
324   return GNUNET_NO;
325 }
326
327
328 /**
329  * Is element @a ee part of the set during @a query_generation?
330  *
331  * @param ee element to test
332  * @param query_generation generation to query
333  * @param excluded array of generations where the element is excluded
334  * @param excluded_size length of the @a excluded array
335  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
336  */
337 static int
338 is_element_of_generation (struct ElementEntry *ee,
339                           unsigned int query_generation,
340                           struct GenerationRange *excluded,
341                           unsigned int excluded_size)
342 {
343   struct MutationEvent *mut;
344   int is_present;
345
346   GNUNET_assert (NULL != ee->mutations);
347   if (GNUNET_YES ==
348       is_excluded_generation (query_generation,
349                               excluded,
350                               excluded_size))
351   {
352     GNUNET_break (0);
353     return GNUNET_NO;
354   }
355
356   is_present = GNUNET_NO;
357
358   /* Could be made faster with binary search, but lists
359      are small, so why bother. */
360   for (unsigned int i = 0; i < ee->mutations_size; i++)
361   {
362     mut = &ee->mutations[i];
363
364     if (mut->generation > query_generation)
365     {
366       /* The mutation doesn't apply to our generation
367          anymore.  We can'b break here, since mutations aren't
368          sorted by generation. */
369       continue;
370     }
371
372     if (GNUNET_YES ==
373         is_excluded_generation (mut->generation,
374                                 excluded,
375                                 excluded_size))
376     {
377       /* The generation is excluded (because it belongs to another
378          fork via a lazy copy) and thus mutations aren't considered
379          for membership testing. */
380       continue;
381     }
382
383     /* This would be an inconsistency in how we manage mutations. */
384     if ( (GNUNET_YES == is_present) &&
385          (GNUNET_YES == mut->added) )
386       GNUNET_assert (0);
387     /* Likewise. */
388     if ( (GNUNET_NO == is_present) &&
389          (GNUNET_NO == mut->added) )
390       GNUNET_assert (0);
391
392     is_present = mut->added;
393   }
394
395   return is_present;
396 }
397
398
399 /**
400  * Is element @a ee part of the set used by @a op?
401  *
402  * @param ee element to test
403  * @param op operation the defines the set and its generation
404  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
405  */
406 int
407 _GSS_is_element_of_operation (struct ElementEntry *ee,
408                               struct Operation *op)
409 {
410   return is_element_of_generation (ee,
411                                    op->generation_created,
412                                    op->set->excluded_generations,
413                                    op->set->excluded_generations_size);
414 }
415
416
417 /**
418  * Destroy the given operation.  Used for any operation where both
419  * peers were known and that thus actually had a vt and channel.  Must
420  * not be used for operations where 'listener' is still set and we do
421  * not know the other peer.
422  *
423  * Call the implementation-specific cancel function of the operation.
424  * Disconnects from the remote peer.  Does not disconnect the client,
425  * as there may be multiple operations per set.
426  *
427  * @param op operation to destroy
428  * @param gc #GNUNET_YES to perform garbage collection on the set
429  */
430 void
431 _GSS_operation_destroy (struct Operation *op,
432                         int gc)
433 {
434   struct Set *set = op->set;
435   struct GNUNET_CADET_Channel *channel;
436
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438               "Destroying operation %p\n",
439               op);
440   GNUNET_assert (NULL == op->listener);
441   if (NULL != op->state)
442   {
443     set->vt->cancel (op);
444     op->state = NULL;
445   }
446   if (NULL != set)
447   {
448     GNUNET_CONTAINER_DLL_remove (set->ops_head,
449                                  set->ops_tail,
450                                  op);
451     op->set = NULL;
452   }
453   if (NULL != op->context_msg)
454   {
455     GNUNET_free (op->context_msg);
456     op->context_msg = NULL;
457   }
458   if (NULL != (channel = op->channel))
459   {
460     /* This will free op; called conditionally as this helper function
461        is also called from within the channel disconnect handler. */
462     op->channel = NULL;
463     GNUNET_CADET_channel_destroy (channel);
464   }
465   if ( (NULL != set) &&
466        (GNUNET_YES == gc) )
467     collect_generation_garbage (set);
468   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
469    * there was a channel end handler that will free 'op' on the call stack. */
470 }
471
472
473 /**
474  * Callback called when a client connects to the service.
475  *
476  * @param cls closure for the service
477  * @param c the new client that connected to the service
478  * @param mq the message queue used to send messages to the client
479  * @return @a `struct ClientState`
480  */
481 static void *
482 client_connect_cb (void *cls,
483                    struct GNUNET_SERVICE_Client *c,
484                    struct GNUNET_MQ_Handle *mq)
485 {
486   struct ClientState *cs;
487
488   cs = GNUNET_new (struct ClientState);
489   cs->client = c;
490   cs->mq = mq;
491   return cs;
492 }
493
494
495 /**
496  * Iterator over hash map entries to free element entries.
497  *
498  * @param cls closure
499  * @param key current key code
500  * @param value a `struct ElementEntry *` to be free'd
501  * @return #GNUNET_YES (continue to iterate)
502  */
503 static int
504 destroy_elements_iterator (void *cls,
505                            const struct GNUNET_HashCode *key,
506                            void *value)
507 {
508   struct ElementEntry *ee = value;
509
510   GNUNET_free_non_null (ee->mutations);
511   GNUNET_free (ee);
512   return GNUNET_YES;
513 }
514
515
516 /**
517  * Clean up after a client has disconnected
518  *
519  * @param cls closure, unused
520  * @param client the client to clean up after
521  * @param internal_cls the `struct ClientState`
522  */
523 static void
524 client_disconnect_cb (void *cls,
525                       struct GNUNET_SERVICE_Client *client,
526                       void *internal_cls)
527 {
528   struct ClientState *cs = internal_cls;
529   struct Operation *op;
530   struct Listener *listener;
531   struct Set *set;
532
533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
534               "Client disconnected, cleaning up\n");
535   if (NULL != (set = cs->set))
536   {
537     struct SetContent *content = set->content;
538     struct PendingMutation *pm;
539     struct PendingMutation *pm_current;
540     struct LazyCopyRequest *lcr;
541
542     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543                 "Destroying client's set\n");
544     /* Destroy pending set operations */
545     while (NULL != set->ops_head)
546       _GSS_operation_destroy (set->ops_head,
547                               GNUNET_NO);
548
549     /* Destroy operation-specific state */
550     GNUNET_assert (NULL != set->state);
551     set->vt->destroy_set (set->state);
552     set->state = NULL;
553
554     /* Clean up ongoing iterations */
555     if (NULL != set->iter)
556     {
557       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
558       set->iter = NULL;
559       set->iteration_id++;
560     }
561
562     /* discard any pending mutations that reference this set */
563     pm = content->pending_mutations_head;
564     while (NULL != pm)
565     {
566       pm_current = pm;
567       pm = pm->next;
568       if (pm_current->set == set)
569       {
570         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
571                                      content->pending_mutations_tail,
572                                      pm_current);
573         GNUNET_free (pm_current);
574       }
575     }
576
577     /* free set content (or at least decrement RC) */
578     set->content = NULL;
579     GNUNET_assert (0 != content->refcount);
580     content->refcount--;
581     if (0 == content->refcount)
582     {
583       GNUNET_assert (NULL != content->elements);
584       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
585                                              &destroy_elements_iterator,
586                                              NULL);
587       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
588       content->elements = NULL;
589       GNUNET_free (content);
590     }
591     GNUNET_free_non_null (set->excluded_generations);
592     set->excluded_generations = NULL;
593
594     /* remove set from pending copy requests */
595     lcr = lazy_copy_head;
596     while (NULL != lcr)
597     {
598       struct LazyCopyRequest *lcr_current = lcr;
599
600       lcr = lcr->next;
601       if (lcr_current->source_set == set)
602       {
603         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
604                                      lazy_copy_tail,
605                                      lcr_current);
606         GNUNET_free (lcr_current);
607       }
608     }
609     GNUNET_free (set);
610   }
611
612   if (NULL != (listener = cs->listener))
613   {
614     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615                 "Destroying client's listener\n");
616     GNUNET_CADET_close_port (listener->open_port);
617     listener->open_port = NULL;
618     while (NULL != (op = listener->op_head))
619       incoming_destroy (op);
620     GNUNET_CONTAINER_DLL_remove (listener_head,
621                                  listener_tail,
622                                  listener);
623     GNUNET_free (listener);
624   }
625   GNUNET_free (cs);
626 }
627
628
629 /**
630  * Check a request for a set operation from another peer.
631  *
632  * @param cls the operation state
633  * @param msg the received message
634  * @return #GNUNET_OK if the channel should be kept alive,
635  *         #GNUNET_SYSERR to destroy the channel
636  */
637 static int
638 check_incoming_msg (void *cls,
639                     const struct OperationRequestMessage *msg)
640 {
641   struct Operation *op = cls;
642   struct Listener *listener = op->listener;
643   const struct GNUNET_MessageHeader *nested_context;
644
645   /* double operation request */
646   if (0 != op->suggest_id)
647   {
648     GNUNET_break_op (0);
649     return GNUNET_SYSERR;
650   }
651   /* This should be equivalent to the previous condition, but can't hurt to check twice */
652   if (NULL == op->listener)
653   {
654     GNUNET_break (0);
655     return GNUNET_SYSERR;
656   }
657   if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
658   {
659     GNUNET_break_op (0);
660     return GNUNET_SYSERR;
661   }
662   nested_context = GNUNET_MQ_extract_nested_mh (msg);
663   if ( (NULL != nested_context) &&
664        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
665   {
666     GNUNET_break_op (0);
667     return GNUNET_SYSERR;
668   }
669   return GNUNET_OK;
670 }
671
672
673 /**
674  * Handle a request for a set operation from another peer.  Checks if we
675  * have a listener waiting for such a request (and in that case initiates
676  * asking the listener about accepting the connection). If no listener
677  * is waiting, we queue the operation request in hope that a listener
678  * shows up soon (before timeout).
679  *
680  * This msg is expected as the first and only msg handled through the
681  * non-operation bound virtual table, acceptance of this operation replaces
682  * our virtual table and subsequent msgs would be routed differently (as
683  * we then know what type of operation this is).
684  *
685  * @param cls the operation state
686  * @param msg the received message
687  * @return #GNUNET_OK if the channel should be kept alive,
688  *         #GNUNET_SYSERR to destroy the channel
689  */
690 static void
691 handle_incoming_msg (void *cls,
692                      const struct OperationRequestMessage *msg)
693 {
694   struct Operation *op = cls;
695   struct Listener *listener = op->listener;
696   const struct GNUNET_MessageHeader *nested_context;
697   struct GNUNET_MQ_Envelope *env;
698   struct GNUNET_SET_RequestMessage *cmsg;
699
700   nested_context = GNUNET_MQ_extract_nested_mh (msg);
701   /* Make a copy of the nested_context (application-specific context
702      information that is opaque to set) so we can pass it to the
703      listener later on */
704   if (NULL != nested_context)
705     op->context_msg = GNUNET_copy_message (nested_context);
706   op->remote_element_count = ntohl (msg->element_count);
707   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
708               "Received P2P operation request (op %u, port %s) for active listener\n",
709               (uint32_t) ntohl (msg->operation),
710               GNUNET_h2s (&op->listener->app_id));
711   GNUNET_assert (0 == op->suggest_id);
712   if (0 == suggest_id)
713     suggest_id++;
714   op->suggest_id = suggest_id++;
715   GNUNET_assert (NULL != op->timeout_task);
716   GNUNET_SCHEDULER_cancel (op->timeout_task);
717   op->timeout_task = NULL;
718   env = GNUNET_MQ_msg_nested_mh (cmsg,
719                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
720                                  op->context_msg);
721   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722               "Suggesting incoming request with accept id %u to listener %p of client %p\n",
723               op->suggest_id,
724               listener,
725               listener->cs);
726   cmsg->accept_id = htonl (op->suggest_id);
727   cmsg->peer_id = op->peer;
728   GNUNET_MQ_send (listener->cs->mq,
729                   env);
730   /* NOTE: GNUNET_CADET_receive_done() will be called in
731      #handle_client_accept() */
732 }
733
734
735 /**
736  * Add an element to @a set as specified by @a msg
737  *
738  * @param set set to manipulate
739  * @param msg message specifying the change
740  */
741 static void
742 execute_add (struct Set *set,
743              const struct GNUNET_SET_ElementMessage *msg)
744 {
745   struct GNUNET_SET_Element el;
746   struct ElementEntry *ee;
747   struct GNUNET_HashCode hash;
748
749   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
750   el.size = ntohs (msg->header.size) - sizeof (*msg);
751   el.data = &msg[1];
752   el.element_type = ntohs (msg->element_type);
753   GNUNET_SET_element_hash (&el,
754                            &hash);
755   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
756                                           &hash);
757   if (NULL == ee)
758   {
759     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760                 "Client inserts element %s of size %u\n",
761                 GNUNET_h2s (&hash),
762                 el.size);
763     ee = GNUNET_malloc (el.size + sizeof (*ee));
764     ee->element.size = el.size;
765     GNUNET_memcpy (&ee[1],
766             el.data,
767             el.size);
768     ee->element.data = &ee[1];
769     ee->element.element_type = el.element_type;
770     ee->remote = GNUNET_NO;
771     ee->mutations = NULL;
772     ee->mutations_size = 0;
773     ee->element_hash = hash;
774     GNUNET_break (GNUNET_YES ==
775                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
776                                                      &ee->element_hash,
777                                                      ee,
778                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
779   }
780   else if (GNUNET_YES ==
781            is_element_of_generation (ee,
782                                      set->current_generation,
783                                      set->excluded_generations,
784                                      set->excluded_generations_size))
785   {
786     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787                 "Client inserted element %s of size %u twice (ignored)\n",
788                 GNUNET_h2s (&hash),
789                 el.size);
790
791     /* same element inserted twice */
792     return;
793   }
794
795   {
796     struct MutationEvent mut = {
797       .generation = set->current_generation,
798       .added = GNUNET_YES
799     };
800     GNUNET_array_append (ee->mutations,
801                          ee->mutations_size,
802                          mut);
803   }
804   set->vt->add (set->state,
805                 ee);
806 }
807
808
809 /**
810  * Remove an element from @a set as specified by @a msg
811  *
812  * @param set set to manipulate
813  * @param msg message specifying the change
814  */
815 static void
816 execute_remove (struct Set *set,
817                 const struct GNUNET_SET_ElementMessage *msg)
818 {
819   struct GNUNET_SET_Element el;
820   struct ElementEntry *ee;
821   struct GNUNET_HashCode hash;
822
823   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
824   el.size = ntohs (msg->header.size) - sizeof (*msg);
825   el.data = &msg[1];
826   el.element_type = ntohs (msg->element_type);
827   GNUNET_SET_element_hash (&el, &hash);
828   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
829                                           &hash);
830   if (NULL == ee)
831   {
832     /* Client tried to remove non-existing element. */
833     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834                 "Client removes non-existing element of size %u\n",
835                 el.size);
836     return;
837   }
838   if (GNUNET_NO ==
839       is_element_of_generation (ee,
840                                 set->current_generation,
841                                 set->excluded_generations,
842                                 set->excluded_generations_size))
843   {
844     /* Client tried to remove element twice */
845     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846                 "Client removed element of size %u twice (ignored)\n",
847                 el.size);
848     return;
849   }
850   else
851   {
852     struct MutationEvent mut = {
853       .generation = set->current_generation,
854       .added = GNUNET_NO
855     };
856
857     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858                 "Client removes element of size %u\n",
859                 el.size);
860
861     GNUNET_array_append (ee->mutations,
862                          ee->mutations_size,
863                          mut);
864   }
865   set->vt->remove (set->state,
866                    ee);
867 }
868
869
870 /**
871  * Perform a mutation on a set as specified by the @a msg
872  *
873  * @param set the set to mutate
874  * @param msg specification of what to change
875  */
876 static void
877 execute_mutation (struct Set *set,
878                   const struct GNUNET_SET_ElementMessage *msg)
879 {
880   switch (ntohs (msg->header.type))
881   {
882     case GNUNET_MESSAGE_TYPE_SET_ADD:
883       execute_add (set, msg);
884       break;
885     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
886       execute_remove (set, msg);
887       break;
888     default:
889       GNUNET_break (0);
890   }
891 }
892
893
894 /**
895  * Execute mutations that were delayed on a set because of
896  * pending operations.
897  *
898  * @param set the set to execute mutations on
899  */
900 static void
901 execute_delayed_mutations (struct Set *set)
902 {
903   struct PendingMutation *pm;
904
905   if (0 != set->content->iterator_count)
906     return; /* still cannot do this */
907   while (NULL != (pm = set->content->pending_mutations_head))
908   {
909     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
910                                  set->content->pending_mutations_tail,
911                                  pm);
912     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913                 "Executing pending mutation on %p.\n",
914                 pm->set);
915     execute_mutation (pm->set,
916                       pm->msg);
917     GNUNET_free (pm->msg);
918     GNUNET_free (pm);
919   }
920 }
921
922
923 /**
924  * Send the next element of a set to the set's client.  The next element is given by
925  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
926  * are no more elements in the set.  The caller must ensure that the set's iterator is
927  * valid.
928  *
929  * The client will acknowledge each received element with a
930  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
931  * #handle_client_iter_ack() will then trigger the next transmission.
932  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
933  *
934  * @param set set that should send its next element to its client
935  */
936 static void
937 send_client_element (struct Set *set)
938 {
939   int ret;
940   struct ElementEntry *ee;
941   struct GNUNET_MQ_Envelope *ev;
942   struct GNUNET_SET_IterResponseMessage *msg;
943
944   GNUNET_assert (NULL != set->iter);
945   do {
946     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
947                                                        NULL,
948                                                        (const void **) &ee);
949     if (GNUNET_NO == ret)
950     {
951       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952                   "Iteration on %p done.\n",
953                   set);
954       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
955       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
956       set->iter = NULL;
957       set->iteration_id++;
958       GNUNET_assert (set->content->iterator_count > 0);
959       set->content->iterator_count--;
960       execute_delayed_mutations (set);
961       GNUNET_MQ_send (set->cs->mq,
962                       ev);
963       return;
964     }
965     GNUNET_assert (NULL != ee);
966   } while (GNUNET_NO ==
967            is_element_of_generation (ee,
968                                      set->iter_generation,
969                                      set->excluded_generations,
970                                      set->excluded_generations_size));
971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972               "Sending iteration element on %p.\n",
973               set);
974   ev = GNUNET_MQ_msg_extra (msg,
975                             ee->element.size,
976                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
977   GNUNET_memcpy (&msg[1],
978                  ee->element.data,
979                  ee->element.size);
980   msg->element_type = htons (ee->element.element_type);
981   msg->iteration_id = htons (set->iteration_id);
982   GNUNET_MQ_send (set->cs->mq,
983                   ev);
984 }
985
986
987 /**
988  * Called when a client wants to iterate the elements of a set.
989  * Checks if we have a set associated with the client and if we
990  * can right now start an iteration. If all checks out, starts
991  * sending the elements of the set to the client.
992  *
993  * @param cls client that sent the message
994  * @param m message sent by the client
995  */
996 static void
997 handle_client_iterate (void *cls,
998                        const struct GNUNET_MessageHeader *m)
999 {
1000   struct ClientState *cs = cls;
1001   struct Set *set;
1002
1003   if (NULL == (set = cs->set))
1004   {
1005     /* attempt to iterate over a non existing set */
1006     GNUNET_break (0);
1007     GNUNET_SERVICE_client_drop (cs->client);
1008     return;
1009   }
1010   if (NULL != set->iter)
1011   {
1012     /* Only one concurrent iterate-action allowed per set */
1013     GNUNET_break (0);
1014     GNUNET_SERVICE_client_drop (cs->client);
1015     return;
1016   }
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018               "Iterating set %p in gen %u with %u content elements\n",
1019               (void *) set,
1020               set->current_generation,
1021               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1022   GNUNET_SERVICE_client_continue (cs->client);
1023   set->content->iterator_count++;
1024   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1025   set->iter_generation = set->current_generation;
1026   send_client_element (set);
1027 }
1028
1029
1030 /**
1031  * Called when a client wants to create a new set.  This is typically
1032  * the first request from a client, and includes the type of set
1033  * operation to be performed.
1034  *
1035  * @param cls client that sent the message
1036  * @param m message sent by the client
1037  */
1038 static void
1039 handle_client_create_set (void *cls,
1040                           const struct GNUNET_SET_CreateMessage *msg)
1041 {
1042   struct ClientState *cs = cls;
1043   struct Set *set;
1044
1045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046               "Client created new set (operation %u)\n",
1047               (uint32_t) ntohl (msg->operation));
1048   if (NULL != cs->set)
1049   {
1050     /* There can only be one set per client */
1051     GNUNET_break (0);
1052     GNUNET_SERVICE_client_drop (cs->client);
1053     return;
1054   }
1055   set = GNUNET_new (struct Set);
1056   switch (ntohl (msg->operation))
1057   {
1058   case GNUNET_SET_OPERATION_INTERSECTION:
1059     set->vt = _GSS_intersection_vt ();
1060     break;
1061   case GNUNET_SET_OPERATION_UNION:
1062     set->vt = _GSS_union_vt ();
1063     break;
1064   default:
1065     GNUNET_free (set);
1066     GNUNET_break (0);
1067     GNUNET_SERVICE_client_drop (cs->client);
1068     return;
1069   }
1070   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1071   set->state = set->vt->create ();
1072   if (NULL == set->state)
1073   {
1074     /* initialization failed (i.e. out of memory) */
1075     GNUNET_free (set);
1076     GNUNET_SERVICE_client_drop (cs->client);
1077     return;
1078   }
1079   set->content = GNUNET_new (struct SetContent);
1080   set->content->refcount = 1;
1081   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1082                                                                  GNUNET_YES);
1083   set->cs = cs;
1084   cs->set = set;
1085   GNUNET_SERVICE_client_continue (cs->client);
1086 }
1087
1088
1089 /**
1090  * Timeout happens iff:
1091  *  - we suggested an operation to our listener,
1092  *    but did not receive a response in time
1093  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1094  *
1095  * @param cls channel context
1096  * @param tc context information (why was this task triggered now)
1097  */
1098 static void
1099 incoming_timeout_cb (void *cls)
1100 {
1101   struct Operation *op = cls;
1102
1103   op->timeout_task = NULL;
1104   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1105               "Remote peer's incoming request timed out\n");
1106   incoming_destroy (op);
1107 }
1108
1109
1110 /**
1111  * Method called whenever another peer has added us to a channel the
1112  * other peer initiated.  Only called (once) upon reception of data
1113  * from a channel we listen on.
1114  *
1115  * The channel context represents the operation itself and gets added
1116  * to a DLL, from where it gets looked up when our local listener
1117  * client responds to a proposed/suggested operation or connects and
1118  * associates with this operation.
1119  *
1120  * @param cls closure
1121  * @param channel new handle to the channel
1122  * @param source peer that started the channel
1123  * @return initial channel context for the channel
1124  *         returns NULL on error
1125  */
1126 static void *
1127 channel_new_cb (void *cls,
1128                 struct GNUNET_CADET_Channel *channel,
1129                 const struct GNUNET_PeerIdentity *source)
1130 {
1131   struct Listener *listener = cls;
1132   struct Operation *op;
1133
1134   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135               "New incoming channel\n");
1136   op = GNUNET_new (struct Operation);
1137   op->listener = listener;
1138   op->peer = *source;
1139   op->channel = channel;
1140   op->mq = GNUNET_CADET_get_mq (op->channel);
1141   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1142                                        UINT32_MAX);
1143   op->timeout_task
1144     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1145                                     &incoming_timeout_cb,
1146                                     op);
1147   GNUNET_CONTAINER_DLL_insert (listener->op_head,
1148                                listener->op_tail,
1149                                op);
1150   return op;
1151 }
1152
1153
1154 /**
1155  * Function called whenever a channel is destroyed.  Should clean up
1156  * any associated state.  It must NOT call
1157  * GNUNET_CADET_channel_destroy() on the channel.
1158  *
1159  * The peer_disconnect function is part of a a virtual table set initially either
1160  * when a peer creates a new channel with us, or once we create
1161  * a new channel ourselves (evaluate).
1162  *
1163  * Once we know the exact type of operation (union/intersection), the vt is
1164  * replaced with an operation specific instance (_GSS_[op]_vt).
1165  *
1166  * @param channel_ctx place where local state associated
1167  *                   with the channel is stored
1168  * @param channel connection to the other end (henceforth invalid)
1169  */
1170 static void
1171 channel_end_cb (void *channel_ctx,
1172                 const struct GNUNET_CADET_Channel *channel)
1173 {
1174   struct Operation *op = channel_ctx;
1175
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177               "channel_end_cb called\n");
1178   op->channel = NULL;
1179   if (NULL != op->listener)
1180     incoming_destroy (op);
1181   else if (NULL != op->set)
1182     op->set->vt->channel_death (op);
1183   else
1184     _GSS_operation_destroy (op,
1185                             GNUNET_YES);
1186   GNUNET_free (op);
1187 }
1188
1189
1190 /**
1191  * Function called whenever an MQ-channel's transmission window size changes.
1192  *
1193  * The first callback in an outgoing channel will be with a non-zero value
1194  * and will mean the channel is connected to the destination.
1195  *
1196  * For an incoming channel it will be called immediately after the
1197  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1198  *
1199  * @param cls Channel closure.
1200  * @param channel Connection to the other end (henceforth invalid).
1201  * @param window_size New window size. If the is more messages than buffer size
1202  *                    this value will be negative..
1203  */
1204 static void
1205 channel_window_cb (void *cls,
1206                    const struct GNUNET_CADET_Channel *channel,
1207                    int window_size)
1208 {
1209   /* FIXME: not implemented, we could do flow control here... */
1210 }
1211
1212
1213 /**
1214  * Called when a client wants to create a new listener.
1215  *
1216  * @param cls client that sent the message
1217  * @param msg message sent by the client
1218  */
1219 static void
1220 handle_client_listen (void *cls,
1221                       const struct GNUNET_SET_ListenMessage *msg)
1222 {
1223   struct ClientState *cs = cls;
1224   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1225     GNUNET_MQ_hd_var_size (incoming_msg,
1226                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1227                            struct OperationRequestMessage,
1228                            NULL),
1229     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1230                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1231                            struct IBFMessage,
1232                            NULL),
1233     GNUNET_MQ_hd_var_size (union_p2p_elements,
1234                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1235                            struct GNUNET_SET_ElementMessage,
1236                            NULL),
1237     GNUNET_MQ_hd_var_size (union_p2p_offer,
1238                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1239                            struct GNUNET_MessageHeader,
1240                            NULL),
1241     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1242                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1243                            struct InquiryMessage,
1244                            NULL),
1245     GNUNET_MQ_hd_var_size (union_p2p_demand,
1246                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1247                            struct GNUNET_MessageHeader,
1248                            NULL),
1249     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1250                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1251                              struct GNUNET_MessageHeader,
1252                              NULL),
1253     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1254                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1255                              struct GNUNET_MessageHeader,
1256                              NULL),
1257     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1258                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1259                              struct GNUNET_MessageHeader,
1260                              NULL),
1261     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1262                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1263                              struct GNUNET_MessageHeader,
1264                              NULL),
1265     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1266                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1267                            struct StrataEstimatorMessage,
1268                            NULL),
1269     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1270                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1271                            struct StrataEstimatorMessage,
1272                            NULL),
1273     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1274                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1275                            struct GNUNET_SET_ElementMessage,
1276                            NULL),
1277     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1278                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1279                              struct IntersectionElementInfoMessage,
1280                              NULL),
1281     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1282                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1283                            struct BFMessage,
1284                            NULL),
1285     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1286                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1287                              struct IntersectionDoneMessage,
1288                              NULL),
1289     GNUNET_MQ_handler_end ()
1290   };
1291   struct Listener *listener;
1292
1293   if (NULL != cs->listener)
1294   {
1295     /* max. one active listener per client! */
1296     GNUNET_break (0);
1297     GNUNET_SERVICE_client_drop (cs->client);
1298     return;
1299   }
1300   listener = GNUNET_new (struct Listener);
1301   listener->cs = cs;
1302   listener->app_id = msg->app_id;
1303   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1304   GNUNET_CONTAINER_DLL_insert (listener_head,
1305                                listener_tail,
1306                                listener);
1307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1308               "New listener created (op %u, port %s)\n",
1309               listener->operation,
1310               GNUNET_h2s (&listener->app_id));
1311   listener->open_port
1312     = GNUNET_CADET_open_port (cadet,
1313                               &msg->app_id,
1314                               &channel_new_cb,
1315                               listener,
1316                               &channel_window_cb,
1317                               &channel_end_cb,
1318                               cadet_handlers);
1319   GNUNET_SERVICE_client_continue (cs->client);
1320 }
1321
1322
1323 /**
1324  * Called when the listening client rejects an operation
1325  * request by another peer.
1326  *
1327  * @param cls client that sent the message
1328  * @param msg message sent by the client
1329  */
1330 static void
1331 handle_client_reject (void *cls,
1332                       const struct GNUNET_SET_RejectMessage *msg)
1333 {
1334   struct ClientState *cs = cls;
1335   struct Operation *op;
1336
1337   op = get_incoming (ntohl (msg->accept_reject_id));
1338   if (NULL == op)
1339   {
1340     /* no matching incoming operation for this reject;
1341        could be that the other peer already disconnected... */
1342     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1343                 "Client rejected unknown operation %u\n",
1344                 (unsigned int) ntohl (msg->accept_reject_id));
1345     GNUNET_SERVICE_client_continue (cs->client);
1346     return;
1347   }
1348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349               "Peer request (op %u, app %s) rejected by client\n",
1350               op->listener->operation,
1351               GNUNET_h2s (&cs->listener->app_id));
1352   GNUNET_CADET_channel_destroy (op->channel);
1353   GNUNET_SERVICE_client_continue (cs->client);
1354 }
1355
1356
1357 /**
1358  * Called when a client wants to add or remove an element to a set it inhabits.
1359  *
1360  * @param cls client that sent the message
1361  * @param msg message sent by the client
1362  */
1363 static int
1364 check_client_mutation (void *cls,
1365                        const struct GNUNET_SET_ElementMessage *msg)
1366 {
1367   /* NOTE: Technically, we should probably check with the
1368      block library whether the element we are given is well-formed */
1369   return GNUNET_OK;
1370 }
1371
1372
1373 /**
1374  * Called when a client wants to add or remove an element to a set it inhabits.
1375  *
1376  * @param cls client that sent the message
1377  * @param msg message sent by the client
1378  */
1379 static void
1380 handle_client_mutation (void *cls,
1381                         const struct GNUNET_SET_ElementMessage *msg)
1382 {
1383   struct ClientState *cs = cls;
1384   struct Set *set;
1385
1386   if (NULL == (set = cs->set))
1387   {
1388     /* client without a set requested an operation */
1389     GNUNET_break (0);
1390     GNUNET_SERVICE_client_drop (cs->client);
1391     return;
1392   }
1393   GNUNET_SERVICE_client_continue (cs->client);
1394
1395   if (0 != set->content->iterator_count)
1396   {
1397     struct PendingMutation *pm;
1398
1399     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1400                 "Scheduling mutation on set\n");
1401     pm = GNUNET_new (struct PendingMutation);
1402     pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1403     pm->set = set;
1404     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1405                                       set->content->pending_mutations_tail,
1406                                       pm);
1407     return;
1408   }
1409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410               "Executing mutation on set\n");
1411   execute_mutation (set,
1412                     msg);
1413 }
1414
1415
1416 /**
1417  * Advance the current generation of a set,
1418  * adding exclusion ranges if necessary.
1419  *
1420  * @param set the set where we want to advance the generation
1421  */
1422 static void
1423 advance_generation (struct Set *set)
1424 {
1425   struct GenerationRange r;
1426
1427   if (set->current_generation == set->content->latest_generation)
1428   {
1429     set->content->latest_generation++;
1430     set->current_generation++;
1431     return;
1432   }
1433
1434   GNUNET_assert (set->current_generation < set->content->latest_generation);
1435
1436   r.start = set->current_generation + 1;
1437   r.end = set->content->latest_generation + 1;
1438   set->content->latest_generation = r.end;
1439   set->current_generation = r.end;
1440   GNUNET_array_append (set->excluded_generations,
1441                        set->excluded_generations_size,
1442                        r);
1443 }
1444
1445
1446 /**
1447  * Called when a client wants to initiate a set operation with another
1448  * peer.  Initiates the CADET connection to the listener and sends the
1449  * request.
1450  *
1451  * @param cls client that sent the message
1452  * @param msg message sent by the client
1453  * @return #GNUNET_OK if the message is well-formed
1454  */
1455 static int
1456 check_client_evaluate (void *cls,
1457                         const struct GNUNET_SET_EvaluateMessage *msg)
1458 {
1459   /* FIXME: suboptimal, even if the context below could be NULL,
1460      there are malformed messages this does not check for... */
1461   return GNUNET_OK;
1462 }
1463
1464
1465 /**
1466  * Called when a client wants to initiate a set operation with another
1467  * peer.  Initiates the CADET connection to the listener and sends the
1468  * request.
1469  *
1470  * @param cls client that sent the message
1471  * @param msg message sent by the client
1472  */
1473 static void
1474 handle_client_evaluate (void *cls,
1475                         const struct GNUNET_SET_EvaluateMessage *msg)
1476 {
1477   struct ClientState *cs = cls;
1478   struct Operation *op = GNUNET_new (struct Operation);
1479   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1480     GNUNET_MQ_hd_var_size (incoming_msg,
1481                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1482                            struct OperationRequestMessage,
1483                            op),
1484     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1485                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1486                            struct IBFMessage,
1487                            op),
1488     GNUNET_MQ_hd_var_size (union_p2p_elements,
1489                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1490                            struct GNUNET_SET_ElementMessage,
1491                            op),
1492     GNUNET_MQ_hd_var_size (union_p2p_offer,
1493                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1494                            struct GNUNET_MessageHeader,
1495                            op),
1496     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1497                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1498                            struct InquiryMessage,
1499                            op),
1500     GNUNET_MQ_hd_var_size (union_p2p_demand,
1501                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1502                            struct GNUNET_MessageHeader,
1503                            op),
1504     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1505                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1506                              struct GNUNET_MessageHeader,
1507                              op),
1508     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1509                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1510                              struct GNUNET_MessageHeader,
1511                              op),
1512     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1513                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1514                              struct GNUNET_MessageHeader,
1515                              op),
1516     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1517                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1518                              struct GNUNET_MessageHeader,
1519                              op),
1520     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1521                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1522                            struct StrataEstimatorMessage,
1523                            op),
1524     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1525                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1526                            struct StrataEstimatorMessage,
1527                            op),
1528     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1529                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1530                            struct GNUNET_SET_ElementMessage,
1531                            op),
1532     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1533                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1534                              struct IntersectionElementInfoMessage,
1535                              op),
1536     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1537                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1538                            struct BFMessage,
1539                            op),
1540     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1541                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1542                              struct IntersectionDoneMessage,
1543                              op),
1544     GNUNET_MQ_handler_end ()
1545   };
1546   struct Set *set;
1547   const struct GNUNET_MessageHeader *context;
1548
1549   if (NULL == (set = cs->set))
1550   {
1551     GNUNET_break (0);
1552     GNUNET_free (op);
1553     GNUNET_SERVICE_client_drop (cs->client);
1554     return;
1555   }
1556   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1557                                        UINT32_MAX);
1558   op->peer = msg->target_peer;
1559   op->result_mode = ntohl (msg->result_mode);
1560   op->client_request_id = ntohl (msg->request_id);
1561   op->byzantine = msg->byzantine;
1562   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1563   op->force_full = msg->force_full;
1564   op->force_delta = msg->force_delta;
1565   context = GNUNET_MQ_extract_nested_mh (msg);
1566
1567   /* Advance generation values, so that
1568      mutations won't interfer with the running operation. */
1569   op->set = set;
1570   op->generation_created = set->current_generation;
1571   advance_generation (set);
1572   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1573                                set->ops_tail,
1574                                op);
1575   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1576               "Creating new CADET channel to port %s for set operation type %u\n",
1577               GNUNET_h2s (&msg->app_id),
1578               set->operation);
1579   op->channel = GNUNET_CADET_channel_create (cadet,
1580                                              op,
1581                                              &msg->target_peer,
1582                                              &msg->app_id,
1583                                              GNUNET_CADET_OPTION_RELIABLE,
1584                                              &channel_window_cb,
1585                                              &channel_end_cb,
1586                                              cadet_handlers);
1587   op->mq = GNUNET_CADET_get_mq (op->channel);
1588   op->state = set->vt->evaluate (op,
1589                                  context);
1590   if (NULL == op->state)
1591   {
1592     GNUNET_break (0);
1593     GNUNET_SERVICE_client_drop (cs->client);
1594     return;
1595   }
1596   GNUNET_SERVICE_client_continue (cs->client);
1597 }
1598
1599
1600 /**
1601  * Handle an ack from a client, and send the next element. Note
1602  * that we only expect acks for set elements, not after the
1603  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1604  *
1605  * @param cls client the client
1606  * @param ack the message
1607  */
1608 static void
1609 handle_client_iter_ack (void *cls,
1610                         const struct GNUNET_SET_IterAckMessage *ack)
1611 {
1612   struct ClientState *cs = cls;
1613   struct Set *set;
1614
1615   if (NULL == (set = cs->set))
1616   {
1617     /* client without a set acknowledged receiving a value */
1618     GNUNET_break (0);
1619     GNUNET_SERVICE_client_drop (cs->client);
1620     return;
1621   }
1622   if (NULL == set->iter)
1623   {
1624     /* client sent an ack, but we were not expecting one (as
1625        set iteration has finished) */
1626     GNUNET_break (0);
1627     GNUNET_SERVICE_client_drop (cs->client);
1628     return;
1629   }
1630   GNUNET_SERVICE_client_continue (cs->client);
1631   if (ntohl (ack->send_more))
1632   {
1633     send_client_element (set);
1634   }
1635   else
1636   {
1637     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1638     set->iter = NULL;
1639     set->iteration_id++;
1640   }
1641 }
1642
1643
1644 /**
1645  * Handle a request from the client to copy a set.
1646  *
1647  * @param cls the client
1648  * @param mh the message
1649  */
1650 static void
1651 handle_client_copy_lazy_prepare (void *cls,
1652                                  const struct GNUNET_MessageHeader *mh)
1653 {
1654   struct ClientState *cs = cls;
1655   struct Set *set;
1656   struct LazyCopyRequest *cr;
1657   struct GNUNET_MQ_Envelope *ev;
1658   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1659
1660   if (NULL == (set = cs->set))
1661   {
1662     /* client without a set requested an operation */
1663     GNUNET_break (0);
1664     GNUNET_SERVICE_client_drop (cs->client);
1665     return;
1666   }
1667   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1668               "Client requested creation of lazy copy\n");
1669   cr = GNUNET_new (struct LazyCopyRequest);
1670   cr->cookie = ++lazy_copy_cookie;
1671   cr->source_set = set;
1672   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1673                                lazy_copy_tail,
1674                                cr);
1675   ev = GNUNET_MQ_msg (resp_msg,
1676                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1677   resp_msg->cookie = cr->cookie;
1678   GNUNET_MQ_send (set->cs->mq,
1679                   ev);
1680   GNUNET_SERVICE_client_continue (cs->client);
1681 }
1682
1683
1684 /**
1685  * Handle a request from the client to connect to a copy of a set.
1686  *
1687  * @param cls the client
1688  * @param msg the message
1689  */
1690 static void
1691 handle_client_copy_lazy_connect (void *cls,
1692                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1693 {
1694   struct ClientState *cs = cls;
1695   struct LazyCopyRequest *cr;
1696   struct Set *set;
1697   int found;
1698
1699   if (NULL != cs->set)
1700   {
1701     /* There can only be one set per client */
1702     GNUNET_break (0);
1703     GNUNET_SERVICE_client_drop (cs->client);
1704     return;
1705   }
1706   found = GNUNET_NO;
1707   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1708   {
1709     if (cr->cookie == msg->cookie)
1710     {
1711       found = GNUNET_YES;
1712       break;
1713     }
1714   }
1715   if (GNUNET_NO == found)
1716   {
1717     /* client asked for copy with cookie we don't know */
1718     GNUNET_break (0);
1719     GNUNET_SERVICE_client_drop (cs->client);
1720     return;
1721   }
1722   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1723                                lazy_copy_tail,
1724                                cr);
1725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726               "Client %p requested use of lazy copy\n",
1727               cs);
1728   set = GNUNET_new (struct Set);
1729   switch (cr->source_set->operation)
1730   {
1731   case GNUNET_SET_OPERATION_INTERSECTION:
1732     set->vt = _GSS_intersection_vt ();
1733     break;
1734   case GNUNET_SET_OPERATION_UNION:
1735     set->vt = _GSS_union_vt ();
1736     break;
1737   default:
1738     GNUNET_assert (0);
1739     return;
1740   }
1741
1742   if (NULL == set->vt->copy_state)
1743   {
1744     /* Lazy copy not supported for this set operation */
1745     GNUNET_break (0);
1746     GNUNET_free (set);
1747     GNUNET_free (cr);
1748     GNUNET_SERVICE_client_drop (cs->client);
1749     return;
1750   }
1751
1752   set->operation = cr->source_set->operation;
1753   set->state = set->vt->copy_state (cr->source_set->state);
1754   set->content = cr->source_set->content;
1755   set->content->refcount++;
1756
1757   set->current_generation = cr->source_set->current_generation;
1758   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1759   set->excluded_generations
1760     = GNUNET_memdup (cr->source_set->excluded_generations,
1761                      set->excluded_generations_size * sizeof (struct GenerationRange));
1762
1763   /* Advance the generation of the new set, so that mutations to the
1764      of the cloned set and the source set are independent. */
1765   advance_generation (set);
1766   set->cs = cs;
1767   cs->set = set;
1768   GNUNET_free (cr);
1769   GNUNET_SERVICE_client_continue (cs->client);
1770 }
1771
1772
1773 /**
1774  * Handle a request from the client to cancel a running set operation.
1775  *
1776  * @param cls the client
1777  * @param msg the message
1778  */
1779 static void
1780 handle_client_cancel (void *cls,
1781                       const struct GNUNET_SET_CancelMessage *msg)
1782 {
1783   struct ClientState *cs = cls;
1784   struct Set *set;
1785   struct Operation *op;
1786   int found;
1787
1788   if (NULL == (set = cs->set))
1789   {
1790     /* client without a set requested an operation */
1791     GNUNET_break (0);
1792     GNUNET_SERVICE_client_drop (cs->client);
1793     return;
1794   }
1795   found = GNUNET_NO;
1796   for (op = set->ops_head; NULL != op; op = op->next)
1797   {
1798     if (op->client_request_id == ntohl (msg->request_id))
1799     {
1800       found = GNUNET_YES;
1801       break;
1802     }
1803   }
1804   if (GNUNET_NO == found)
1805   {
1806     /* It may happen that the operation was already destroyed due to
1807      * the other peer disconnecting.  The client may not know about this
1808      * yet and try to cancel the (just barely non-existent) operation.
1809      * So this is not a hard error.
1810      */
1811     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1812                 "Client canceled non-existent op %u\n",
1813                 (uint32_t) ntohl (msg->request_id));
1814   }
1815   else
1816   {
1817     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1818                 "Client requested cancel for op %u\n",
1819                 (uint32_t) ntohl (msg->request_id));
1820     _GSS_operation_destroy (op,
1821                             GNUNET_YES);
1822   }
1823   GNUNET_SERVICE_client_continue (cs->client);
1824 }
1825
1826
1827 /**
1828  * Handle a request from the client to accept a set operation that
1829  * came from a remote peer.  We forward the accept to the associated
1830  * operation for handling
1831  *
1832  * @param cls the client
1833  * @param msg the message
1834  */
1835 static void
1836 handle_client_accept (void *cls,
1837                       const struct GNUNET_SET_AcceptMessage *msg)
1838 {
1839   struct ClientState *cs = cls;
1840   struct Set *set;
1841   struct Operation *op;
1842   struct GNUNET_SET_ResultMessage *result_message;
1843   struct GNUNET_MQ_Envelope *ev;
1844   struct Listener *listener;
1845
1846   if (NULL == (set = cs->set))
1847   {
1848     /* client without a set requested to accept */
1849     GNUNET_break (0);
1850     GNUNET_SERVICE_client_drop (cs->client);
1851     return;
1852   }
1853   op = get_incoming (ntohl (msg->accept_reject_id));
1854   if (NULL == op)
1855   {
1856     /* It is not an error if the set op does not exist -- it may
1857      * have been destroyed when the partner peer disconnected. */
1858     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1859                 "Client %p accepted request %u of listener %p that is no longer active\n",
1860                 cs,
1861                 ntohl (msg->accept_reject_id),
1862                 cs->listener);
1863     ev = GNUNET_MQ_msg (result_message,
1864                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1865     result_message->request_id = msg->request_id;
1866     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1867     GNUNET_MQ_send (set->cs->mq,
1868                     ev);
1869     GNUNET_SERVICE_client_continue (cs->client);
1870     return;
1871   }
1872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1873               "Client accepting request %u\n",
1874               (uint32_t) ntohl (msg->accept_reject_id));
1875   listener = op->listener;
1876   op->listener = NULL;
1877   GNUNET_CONTAINER_DLL_remove (listener->op_head,
1878                                listener->op_tail,
1879                                op);
1880   op->set = set;
1881   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1882                                set->ops_tail,
1883                                op);
1884   op->client_request_id = ntohl (msg->request_id);
1885   op->result_mode = ntohl (msg->result_mode);
1886   op->byzantine = msg->byzantine;
1887   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1888   op->force_full = msg->force_full;
1889   op->force_delta = msg->force_delta;
1890
1891   /* Advance generation values, so that future mutations do not
1892      interfer with the running operation. */
1893   op->generation_created = set->current_generation;
1894   advance_generation (set);
1895   GNUNET_assert (NULL == op->state);
1896   op->state = set->vt->accept (op);
1897   if (NULL == op->state)
1898   {
1899     GNUNET_break (0);
1900     GNUNET_SERVICE_client_drop (cs->client);
1901     return;
1902   }
1903   /* Now allow CADET to continue, as we did not do this in
1904      #handle_incoming_msg (as we wanted to first see if the
1905      local client would accept the request). */
1906   GNUNET_CADET_receive_done (op->channel);
1907   GNUNET_SERVICE_client_continue (cs->client);
1908 }
1909
1910
1911 /**
1912  * Called to clean up, after a shutdown has been requested.
1913  *
1914  * @param cls closure, NULL
1915  */
1916 static void
1917 shutdown_task (void *cls)
1918 {
1919   /* Delay actual shutdown to allow service to disconnect clients */
1920   if (NULL != cadet)
1921   {
1922     GNUNET_CADET_disconnect (cadet);
1923     cadet = NULL;
1924   }
1925   GNUNET_STATISTICS_destroy (_GSS_statistics,
1926                              GNUNET_YES);
1927   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928               "handled shutdown request\n");
1929 }
1930
1931
1932 /**
1933  * Function called by the service's run
1934  * method to run service-specific setup code.
1935  *
1936  * @param cls closure
1937  * @param cfg configuration to use
1938  * @param service the initialized service
1939  */
1940 static void
1941 run (void *cls,
1942      const struct GNUNET_CONFIGURATION_Handle *cfg,
1943      struct GNUNET_SERVICE_Handle *service)
1944 {
1945   /* FIXME: need to modify SERVICE (!) API to allow
1946      us to run a shutdown task *after* clients were
1947      forcefully disconnected! */
1948   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1949                                  NULL);
1950   _GSS_statistics = GNUNET_STATISTICS_create ("set",
1951                                               cfg);
1952   cadet = GNUNET_CADET_connect (cfg);
1953   if (NULL == cadet)
1954   {
1955     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1956                 _("Could not connect to CADET service\n"));
1957     GNUNET_SCHEDULER_shutdown ();
1958     return;
1959   }
1960 }
1961
1962
1963 /**
1964  * Define "main" method using service macro.
1965  */
1966 GNUNET_SERVICE_MAIN
1967 ("set",
1968  GNUNET_SERVICE_OPTION_NONE,
1969  &run,
1970  &client_connect_cb,
1971  &client_disconnect_cb,
1972  NULL,
1973  GNUNET_MQ_hd_fixed_size (client_accept,
1974                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1975                           struct GNUNET_SET_AcceptMessage,
1976                           NULL),
1977  GNUNET_MQ_hd_fixed_size (client_iter_ack,
1978                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1979                           struct GNUNET_SET_IterAckMessage,
1980                           NULL),
1981  GNUNET_MQ_hd_var_size (client_mutation,
1982                         GNUNET_MESSAGE_TYPE_SET_ADD,
1983                         struct GNUNET_SET_ElementMessage,
1984                         NULL),
1985  GNUNET_MQ_hd_fixed_size (client_create_set,
1986                           GNUNET_MESSAGE_TYPE_SET_CREATE,
1987                           struct GNUNET_SET_CreateMessage,
1988                           NULL),
1989  GNUNET_MQ_hd_fixed_size (client_iterate,
1990                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1991                           struct GNUNET_MessageHeader,
1992                           NULL),
1993  GNUNET_MQ_hd_var_size (client_evaluate,
1994                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1995                         struct GNUNET_SET_EvaluateMessage,
1996                         NULL),
1997  GNUNET_MQ_hd_fixed_size (client_listen,
1998                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
1999                           struct GNUNET_SET_ListenMessage,
2000                           NULL),
2001  GNUNET_MQ_hd_fixed_size (client_reject,
2002                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2003                           struct GNUNET_SET_RejectMessage,
2004                           NULL),
2005  GNUNET_MQ_hd_var_size (client_mutation,
2006                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2007                         struct GNUNET_SET_ElementMessage,
2008                         NULL),
2009  GNUNET_MQ_hd_fixed_size (client_cancel,
2010                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2011                           struct GNUNET_SET_CancelMessage,
2012                           NULL),
2013  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2014                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2015                           struct GNUNET_MessageHeader,
2016                           NULL),
2017  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2018                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2019                           struct GNUNET_SET_CopyLazyConnectMessage,
2020                           NULL),
2021  GNUNET_MQ_handler_end ());
2022
2023
2024 /* end of gnunet-service-set.c */