fix uninit listener field
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
31
32 /**
33  * How long do we hold on to an incoming channel if there is
34  * no local listener before giving up?
35  */
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38
39 /**
40  * Lazy copy requests made by a client.
41  */
42 struct LazyCopyRequest
43 {
44   /**
45    * Kept in a DLL.
46    */
47   struct LazyCopyRequest *prev;
48
49   /**
50    * Kept in a DLL.
51    */
52   struct LazyCopyRequest *next;
53
54   /**
55    * Which set are we supposed to copy?
56    */
57   struct Set *source_set;
58
59   /**
60    * Cookie identifying the request.
61    */
62   uint32_t cookie;
63
64 };
65
66
67 /**
68  * A listener is inhabited by a client, and waits for evaluation
69  * requests from remote peers.
70  */
71 struct Listener
72 {
73   /**
74    * Listeners are held in a doubly linked list.
75    */
76   struct Listener *next;
77
78   /**
79    * Listeners are held in a doubly linked list.
80    */
81   struct Listener *prev;
82
83   /**
84    * Head of DLL of operations this listener is responsible for.
85    * Once the client has accepted/declined the operation, the
86    * operation is moved to the respective set's operation DLLS.
87    */
88   struct Operation *op_head;
89
90   /**
91    * Tail of DLL of operations this listener is responsible for.
92    * Once the client has accepted/declined the operation, the
93    * operation is moved to the respective set's operation DLLS.
94    */
95   struct Operation *op_tail;
96
97   /**
98    * Client that owns the listener.
99    * Only one client may own a listener.
100    */
101   struct ClientState *cs;
102
103   /**
104    * The port we are listening on with CADET.
105    */
106   struct GNUNET_CADET_Port *open_port;
107
108   /**
109    * Application ID for the operation, used to distinguish
110    * multiple operations of the same type with the same peer.
111    */
112   struct GNUNET_HashCode app_id;
113
114   /**
115    * The type of the operation.
116    */
117   enum GNUNET_SET_OperationType operation;
118 };
119
120
121 /**
122  * Handle to the cadet service, used to listen for and connect to
123  * remote peers.
124  */
125 static struct GNUNET_CADET_Handle *cadet;
126
127 /**
128  * DLL of lazy copy requests by this client.
129  */
130 static struct LazyCopyRequest *lazy_copy_head;
131
132 /**
133  * DLL of lazy copy requests by this client.
134  */
135 static struct LazyCopyRequest *lazy_copy_tail;
136
137 /**
138  * Generator for unique cookie we set per lazy copy request.
139  */
140 static uint32_t lazy_copy_cookie;
141
142 /**
143  * Statistics handle.
144  */
145 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
146
147 /**
148  * Listeners are held in a doubly linked list.
149  */
150 static struct Listener *listener_head;
151
152 /**
153  * Listeners are held in a doubly linked list.
154  */
155 static struct Listener *listener_tail;
156
157 /**
158  * Number of active clients.
159  */
160 static unsigned int num_clients;
161
162 /**
163  * Are we in shutdown? if #GNUNET_YES and the number of clients
164  * drops to zero, disconnect from CADET.
165  */
166 static int in_shutdown;
167
168 /**
169  * Counter for allocating unique IDs for clients, used to identify
170  * incoming operation requests from remote peers, that the client can
171  * choose to accept or refuse.  0 must not be used (reserved for
172  * uninitialized).
173  */
174 static uint32_t suggest_id;
175
176
177 /**
178  * Get the incoming socket associated with the given id.
179  *
180  * @param listener the listener to look in
181  * @param id id to look for
182  * @return the incoming socket associated with the id,
183  *         or NULL if there is none
184  */
185 static struct Operation *
186 get_incoming (uint32_t id)
187 {
188   for (struct Listener *listener = listener_head;
189        NULL != listener;
190        listener = listener->next)
191   {
192     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
193       if (op->suggest_id == id)
194         return op;
195   }
196   return NULL;
197 }
198
199
200 /**
201  * Destroy an incoming request from a remote peer
202  *
203  * @param op remote request to destroy
204  */
205 static void
206 incoming_destroy (struct Operation *op)
207 {
208   struct Listener *listener;
209   struct GNUNET_CADET_Channel *channel;
210
211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
212               "Destroying incoming operation %p\n",
213               op);
214   if (NULL != (listener = op->listener))
215   {
216     GNUNET_CONTAINER_DLL_remove (listener->op_head,
217                                  listener->op_tail,
218                                  op);
219     op->listener = NULL;
220   }
221   if (NULL != op->timeout_task)
222   {
223     GNUNET_SCHEDULER_cancel (op->timeout_task);
224     op->timeout_task = NULL;
225   }
226   if (NULL != (channel = op->channel))
227   {
228     op->channel = NULL;
229     GNUNET_CADET_channel_destroy (channel);
230   }
231 }
232
233
234 /**
235  * Context for the #garbage_collect_cb().
236  */
237 struct GarbageContext
238 {
239
240   /**
241    * Map for which we are garbage collecting removed elements.
242    */
243   struct GNUNET_CONTAINER_MultiHashMap *map;
244
245   /**
246    * Lowest generation for which an operation is still pending.
247    */
248   unsigned int min_op_generation;
249
250   /**
251    * Largest generation for which an operation is still pending.
252    */
253   unsigned int max_op_generation;
254
255 };
256
257
258 /**
259  * Function invoked to check if an element can be removed from
260  * the set's history because it is no longer needed.
261  *
262  * @param cls the `struct GarbageContext *`
263  * @param key key of the element in the map
264  * @param value the `struct ElementEntry *`
265  * @return #GNUNET_OK (continue to iterate)
266  */
267 static int
268 garbage_collect_cb (void *cls,
269                     const struct GNUNET_HashCode *key,
270                     void *value)
271 {
272   //struct GarbageContext *gc = cls;
273   //struct ElementEntry *ee = value;
274
275   //if (GNUNET_YES != ee->removed)
276   //  return GNUNET_OK;
277   //if ( (gc->max_op_generation < ee->generation_added) ||
278   //     (ee->generation_removed > gc->min_op_generation) )
279   //{
280   //  GNUNET_assert (GNUNET_YES ==
281   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
282   //                                                       key,
283   //                                                       ee));
284   //  GNUNET_free (ee);
285   //}
286   return GNUNET_OK;
287 }
288
289
290 /**
291  * Collect and destroy elements that are not needed anymore, because
292  * their lifetime (as determined by their generation) does not overlap
293  * with any active set operation.
294  *
295  * @param set set to garbage collect
296  */
297 static void
298 collect_generation_garbage (struct Set *set)
299 {
300   struct GarbageContext gc;
301
302   gc.min_op_generation = UINT_MAX;
303   gc.max_op_generation = 0;
304   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
305   {
306     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
307                                        op->generation_created);
308     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
309                                        op->generation_created);
310   }
311   gc.map = set->content->elements;
312   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
313                                          &garbage_collect_cb,
314                                          &gc);
315 }
316
317
318 /**
319  * Is @a generation in the range of exclusions?
320  *
321  * @param generation generation to query
322  * @param excluded array of generations where the element is excluded
323  * @param excluded_size length of the @a excluded array
324  * @return #GNUNET_YES if @a generation is in any of the ranges
325  */
326 static int
327 is_excluded_generation (unsigned int generation,
328                         struct GenerationRange *excluded,
329                         unsigned int excluded_size)
330 {
331   for (unsigned int i = 0; i < excluded_size; i++)
332     if ( (generation >= excluded[i].start) &&
333          (generation < excluded[i].end) )
334       return GNUNET_YES;
335   return GNUNET_NO;
336 }
337
338
339 /**
340  * Is element @a ee part of the set during @a query_generation?
341  *
342  * @param ee element to test
343  * @param query_generation generation to query
344  * @param excluded array of generations where the element is excluded
345  * @param excluded_size length of the @a excluded array
346  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
347  */
348 static int
349 is_element_of_generation (struct ElementEntry *ee,
350                           unsigned int query_generation,
351                           struct GenerationRange *excluded,
352                           unsigned int excluded_size)
353 {
354   struct MutationEvent *mut;
355   int is_present;
356
357   GNUNET_assert (NULL != ee->mutations);
358   if (GNUNET_YES ==
359       is_excluded_generation (query_generation,
360                               excluded,
361                               excluded_size))
362   {
363     GNUNET_break (0);
364     return GNUNET_NO;
365   }
366
367   is_present = GNUNET_NO;
368
369   /* Could be made faster with binary search, but lists
370      are small, so why bother. */
371   for (unsigned int i = 0; i < ee->mutations_size; i++)
372   {
373     mut = &ee->mutations[i];
374
375     if (mut->generation > query_generation)
376     {
377       /* The mutation doesn't apply to our generation
378          anymore.  We can'b break here, since mutations aren't
379          sorted by generation. */
380       continue;
381     }
382
383     if (GNUNET_YES ==
384         is_excluded_generation (mut->generation,
385                                 excluded,
386                                 excluded_size))
387     {
388       /* The generation is excluded (because it belongs to another
389          fork via a lazy copy) and thus mutations aren't considered
390          for membership testing. */
391       continue;
392     }
393
394     /* This would be an inconsistency in how we manage mutations. */
395     if ( (GNUNET_YES == is_present) &&
396          (GNUNET_YES == mut->added) )
397       GNUNET_assert (0);
398     /* Likewise. */
399     if ( (GNUNET_NO == is_present) &&
400          (GNUNET_NO == mut->added) )
401       GNUNET_assert (0);
402
403     is_present = mut->added;
404   }
405
406   return is_present;
407 }
408
409
410 /**
411  * Is element @a ee part of the set used by @a op?
412  *
413  * @param ee element to test
414  * @param op operation the defines the set and its generation
415  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
416  */
417 int
418 _GSS_is_element_of_operation (struct ElementEntry *ee,
419                               struct Operation *op)
420 {
421   return is_element_of_generation (ee,
422                                    op->generation_created,
423                                    op->set->excluded_generations,
424                                    op->set->excluded_generations_size);
425 }
426
427
428 /**
429  * Destroy the given operation.  Used for any operation where both
430  * peers were known and that thus actually had a vt and channel.  Must
431  * not be used for operations where 'listener' is still set and we do
432  * not know the other peer.
433  *
434  * Call the implementation-specific cancel function of the operation.
435  * Disconnects from the remote peer.  Does not disconnect the client,
436  * as there may be multiple operations per set.
437  *
438  * @param op operation to destroy
439  * @param gc #GNUNET_YES to perform garbage collection on the set
440  */
441 void
442 _GSS_operation_destroy (struct Operation *op,
443                         int gc)
444 {
445   struct Set *set = op->set;
446   struct GNUNET_CADET_Channel *channel;
447
448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449               "Destroying operation %p\n",
450               op);
451   GNUNET_assert (NULL == op->listener);
452   if (NULL != op->state)
453   {
454     set->vt->cancel (op);
455     op->state = NULL;
456   }
457   if (NULL != set)
458   {
459     GNUNET_CONTAINER_DLL_remove (set->ops_head,
460                                  set->ops_tail,
461                                  op);
462     op->set = NULL;
463   }
464   if (NULL != op->context_msg)
465   {
466     GNUNET_free (op->context_msg);
467     op->context_msg = NULL;
468   }
469   if (NULL != (channel = op->channel))
470   {
471     /* This will free op; called conditionally as this helper function
472        is also called from within the channel disconnect handler. */
473     op->channel = NULL;
474     GNUNET_CADET_channel_destroy (channel);
475   }
476   if ( (NULL != set) &&
477        (GNUNET_YES == gc) )
478     collect_generation_garbage (set);
479   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
480    * there was a channel end handler that will free 'op' on the call stack. */
481 }
482
483
484 /**
485  * Callback called when a client connects to the service.
486  *
487  * @param cls closure for the service
488  * @param c the new client that connected to the service
489  * @param mq the message queue used to send messages to the client
490  * @return @a `struct ClientState`
491  */
492 static void *
493 client_connect_cb (void *cls,
494                    struct GNUNET_SERVICE_Client *c,
495                    struct GNUNET_MQ_Handle *mq)
496 {
497   struct ClientState *cs;
498
499   num_clients++;
500   cs = GNUNET_new (struct ClientState);
501   cs->client = c;
502   cs->mq = mq;
503   return cs;
504 }
505
506
507 /**
508  * Iterator over hash map entries to free element entries.
509  *
510  * @param cls closure
511  * @param key current key code
512  * @param value a `struct ElementEntry *` to be free'd
513  * @return #GNUNET_YES (continue to iterate)
514  */
515 static int
516 destroy_elements_iterator (void *cls,
517                            const struct GNUNET_HashCode *key,
518                            void *value)
519 {
520   struct ElementEntry *ee = value;
521
522   GNUNET_free_non_null (ee->mutations);
523   GNUNET_free (ee);
524   return GNUNET_YES;
525 }
526
527
528 /**
529  * Clean up after a client has disconnected
530  *
531  * @param cls closure, unused
532  * @param client the client to clean up after
533  * @param internal_cls the `struct ClientState`
534  */
535 static void
536 client_disconnect_cb (void *cls,
537                       struct GNUNET_SERVICE_Client *client,
538                       void *internal_cls)
539 {
540   struct ClientState *cs = internal_cls;
541   struct Operation *op;
542   struct Listener *listener;
543   struct Set *set;
544
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546               "Client disconnected, cleaning up\n");
547   if (NULL != (set = cs->set))
548   {
549     struct SetContent *content = set->content;
550     struct PendingMutation *pm;
551     struct PendingMutation *pm_current;
552     struct LazyCopyRequest *lcr;
553
554     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555                 "Destroying client's set\n");
556     /* Destroy pending set operations */
557     while (NULL != set->ops_head)
558       _GSS_operation_destroy (set->ops_head,
559                               GNUNET_NO);
560
561     /* Destroy operation-specific state */
562     GNUNET_assert (NULL != set->state);
563     set->vt->destroy_set (set->state);
564     set->state = NULL;
565
566     /* Clean up ongoing iterations */
567     if (NULL != set->iter)
568     {
569       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
570       set->iter = NULL;
571       set->iteration_id++;
572     }
573
574     /* discard any pending mutations that reference this set */
575     pm = content->pending_mutations_head;
576     while (NULL != pm)
577     {
578       pm_current = pm;
579       pm = pm->next;
580       if (pm_current->set == set)
581       {
582         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
583                                      content->pending_mutations_tail,
584                                      pm_current);
585         GNUNET_free (pm_current);
586       }
587     }
588
589     /* free set content (or at least decrement RC) */
590     set->content = NULL;
591     GNUNET_assert (0 != content->refcount);
592     content->refcount--;
593     if (0 == content->refcount)
594     {
595       GNUNET_assert (NULL != content->elements);
596       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
597                                              &destroy_elements_iterator,
598                                              NULL);
599       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
600       content->elements = NULL;
601       GNUNET_free (content);
602     }
603     GNUNET_free_non_null (set->excluded_generations);
604     set->excluded_generations = NULL;
605
606     /* remove set from pending copy requests */
607     lcr = lazy_copy_head;
608     while (NULL != lcr)
609     {
610       struct LazyCopyRequest *lcr_current = lcr;
611
612       lcr = lcr->next;
613       if (lcr_current->source_set == set)
614       {
615         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
616                                      lazy_copy_tail,
617                                      lcr_current);
618         GNUNET_free (lcr_current);
619       }
620     }
621     GNUNET_free (set);
622   }
623
624   if (NULL != (listener = cs->listener))
625   {
626     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627                 "Destroying client's listener\n");
628     GNUNET_CADET_close_port (listener->open_port);
629     listener->open_port = NULL;
630     while (NULL != (op = listener->op_head))
631       incoming_destroy (op);
632     GNUNET_CONTAINER_DLL_remove (listener_head,
633                                  listener_tail,
634                                  listener);
635     GNUNET_free (listener);
636   }
637   GNUNET_free (cs);
638   num_clients--;
639   if ( (GNUNET_YES == in_shutdown) &&
640        (0 == num_clients) )
641   {
642     if (NULL != cadet)
643     {
644       GNUNET_CADET_disconnect (cadet);
645       cadet = NULL;
646     }
647   }
648 }
649
650
651 /**
652  * Check a request for a set operation from another peer.
653  *
654  * @param cls the operation state
655  * @param msg the received message
656  * @return #GNUNET_OK if the channel should be kept alive,
657  *         #GNUNET_SYSERR to destroy the channel
658  */
659 static int
660 check_incoming_msg (void *cls,
661                     const struct OperationRequestMessage *msg)
662 {
663   struct Operation *op = cls;
664   struct Listener *listener = op->listener;
665   const struct GNUNET_MessageHeader *nested_context;
666
667   /* double operation request */
668   if (0 != op->suggest_id)
669   {
670     GNUNET_break_op (0);
671     return GNUNET_SYSERR;
672   }
673   /* This should be equivalent to the previous condition, but can't hurt to check twice */
674   if (NULL == op->listener)
675   {
676     GNUNET_break (0);
677     return GNUNET_SYSERR;
678   }
679   if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
680   {
681     GNUNET_break_op (0);
682     return GNUNET_SYSERR;
683   }
684   nested_context = GNUNET_MQ_extract_nested_mh (msg);
685   if ( (NULL != nested_context) &&
686        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
687   {
688     GNUNET_break_op (0);
689     return GNUNET_SYSERR;
690   }
691   return GNUNET_OK;
692 }
693
694
695 /**
696  * Handle a request for a set operation from another peer.  Checks if we
697  * have a listener waiting for such a request (and in that case initiates
698  * asking the listener about accepting the connection). If no listener
699  * is waiting, we queue the operation request in hope that a listener
700  * shows up soon (before timeout).
701  *
702  * This msg is expected as the first and only msg handled through the
703  * non-operation bound virtual table, acceptance of this operation replaces
704  * our virtual table and subsequent msgs would be routed differently (as
705  * we then know what type of operation this is).
706  *
707  * @param cls the operation state
708  * @param msg the received message
709  * @return #GNUNET_OK if the channel should be kept alive,
710  *         #GNUNET_SYSERR to destroy the channel
711  */
712 static void
713 handle_incoming_msg (void *cls,
714                      const struct OperationRequestMessage *msg)
715 {
716   struct Operation *op = cls;
717   struct Listener *listener = op->listener;
718   const struct GNUNET_MessageHeader *nested_context;
719   struct GNUNET_MQ_Envelope *env;
720   struct GNUNET_SET_RequestMessage *cmsg;
721
722   nested_context = GNUNET_MQ_extract_nested_mh (msg);
723   /* Make a copy of the nested_context (application-specific context
724      information that is opaque to set) so we can pass it to the
725      listener later on */
726   if (NULL != nested_context)
727     op->context_msg = GNUNET_copy_message (nested_context);
728   op->remote_element_count = ntohl (msg->element_count);
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "Received P2P operation request (op %u, port %s) for active listener\n",
731               (uint32_t) ntohl (msg->operation),
732               GNUNET_h2s (&op->listener->app_id));
733   GNUNET_assert (0 == op->suggest_id);
734   if (0 == suggest_id)
735     suggest_id++;
736   op->suggest_id = suggest_id++;
737   GNUNET_assert (NULL != op->timeout_task);
738   GNUNET_SCHEDULER_cancel (op->timeout_task);
739   op->timeout_task = NULL;
740   env = GNUNET_MQ_msg_nested_mh (cmsg,
741                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
742                                  op->context_msg);
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "Suggesting incoming request with accept id %u to listener %p of client %p\n",
745               op->suggest_id,
746               listener,
747               listener->cs);
748   cmsg->accept_id = htonl (op->suggest_id);
749   cmsg->peer_id = op->peer;
750   GNUNET_MQ_send (listener->cs->mq,
751                   env);
752   /* NOTE: GNUNET_CADET_receive_done() will be called in
753      #handle_client_accept() */
754 }
755
756
757 /**
758  * Add an element to @a set as specified by @a msg
759  *
760  * @param set set to manipulate
761  * @param msg message specifying the change
762  */
763 static void
764 execute_add (struct Set *set,
765              const struct GNUNET_SET_ElementMessage *msg)
766 {
767   struct GNUNET_SET_Element el;
768   struct ElementEntry *ee;
769   struct GNUNET_HashCode hash;
770
771   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
772   el.size = ntohs (msg->header.size) - sizeof (*msg);
773   el.data = &msg[1];
774   el.element_type = ntohs (msg->element_type);
775   GNUNET_SET_element_hash (&el,
776                            &hash);
777   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
778                                           &hash);
779   if (NULL == ee)
780   {
781     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782                 "Client inserts element %s of size %u\n",
783                 GNUNET_h2s (&hash),
784                 el.size);
785     ee = GNUNET_malloc (el.size + sizeof (*ee));
786     ee->element.size = el.size;
787     GNUNET_memcpy (&ee[1],
788             el.data,
789             el.size);
790     ee->element.data = &ee[1];
791     ee->element.element_type = el.element_type;
792     ee->remote = GNUNET_NO;
793     ee->mutations = NULL;
794     ee->mutations_size = 0;
795     ee->element_hash = hash;
796     GNUNET_break (GNUNET_YES ==
797                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
798                                                      &ee->element_hash,
799                                                      ee,
800                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
801   }
802   else if (GNUNET_YES ==
803            is_element_of_generation (ee,
804                                      set->current_generation,
805                                      set->excluded_generations,
806                                      set->excluded_generations_size))
807   {
808     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809                 "Client inserted element %s of size %u twice (ignored)\n",
810                 GNUNET_h2s (&hash),
811                 el.size);
812
813     /* same element inserted twice */
814     return;
815   }
816
817   {
818     struct MutationEvent mut = {
819       .generation = set->current_generation,
820       .added = GNUNET_YES
821     };
822     GNUNET_array_append (ee->mutations,
823                          ee->mutations_size,
824                          mut);
825   }
826   set->vt->add (set->state,
827                 ee);
828 }
829
830
831 /**
832  * Remove an element from @a set as specified by @a msg
833  *
834  * @param set set to manipulate
835  * @param msg message specifying the change
836  */
837 static void
838 execute_remove (struct Set *set,
839                 const struct GNUNET_SET_ElementMessage *msg)
840 {
841   struct GNUNET_SET_Element el;
842   struct ElementEntry *ee;
843   struct GNUNET_HashCode hash;
844
845   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
846   el.size = ntohs (msg->header.size) - sizeof (*msg);
847   el.data = &msg[1];
848   el.element_type = ntohs (msg->element_type);
849   GNUNET_SET_element_hash (&el, &hash);
850   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
851                                           &hash);
852   if (NULL == ee)
853   {
854     /* Client tried to remove non-existing element. */
855     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856                 "Client removes non-existing element of size %u\n",
857                 el.size);
858     return;
859   }
860   if (GNUNET_NO ==
861       is_element_of_generation (ee,
862                                 set->current_generation,
863                                 set->excluded_generations,
864                                 set->excluded_generations_size))
865   {
866     /* Client tried to remove element twice */
867     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                 "Client removed element of size %u twice (ignored)\n",
869                 el.size);
870     return;
871   }
872   else
873   {
874     struct MutationEvent mut = {
875       .generation = set->current_generation,
876       .added = GNUNET_NO
877     };
878
879     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880                 "Client removes element of size %u\n",
881                 el.size);
882
883     GNUNET_array_append (ee->mutations,
884                          ee->mutations_size,
885                          mut);
886   }
887   set->vt->remove (set->state,
888                    ee);
889 }
890
891
892 /**
893  * Perform a mutation on a set as specified by the @a msg
894  *
895  * @param set the set to mutate
896  * @param msg specification of what to change
897  */
898 static void
899 execute_mutation (struct Set *set,
900                   const struct GNUNET_SET_ElementMessage *msg)
901 {
902   switch (ntohs (msg->header.type))
903   {
904     case GNUNET_MESSAGE_TYPE_SET_ADD:
905       execute_add (set, msg);
906       break;
907     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
908       execute_remove (set, msg);
909       break;
910     default:
911       GNUNET_break (0);
912   }
913 }
914
915
916 /**
917  * Execute mutations that were delayed on a set because of
918  * pending operations.
919  *
920  * @param set the set to execute mutations on
921  */
922 static void
923 execute_delayed_mutations (struct Set *set)
924 {
925   struct PendingMutation *pm;
926
927   if (0 != set->content->iterator_count)
928     return; /* still cannot do this */
929   while (NULL != (pm = set->content->pending_mutations_head))
930   {
931     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
932                                  set->content->pending_mutations_tail,
933                                  pm);
934     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935                 "Executing pending mutation on %p.\n",
936                 pm->set);
937     execute_mutation (pm->set,
938                       pm->msg);
939     GNUNET_free (pm->msg);
940     GNUNET_free (pm);
941   }
942 }
943
944
945 /**
946  * Send the next element of a set to the set's client.  The next element is given by
947  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
948  * are no more elements in the set.  The caller must ensure that the set's iterator is
949  * valid.
950  *
951  * The client will acknowledge each received element with a
952  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
953  * #handle_client_iter_ack() will then trigger the next transmission.
954  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
955  *
956  * @param set set that should send its next element to its client
957  */
958 static void
959 send_client_element (struct Set *set)
960 {
961   int ret;
962   struct ElementEntry *ee;
963   struct GNUNET_MQ_Envelope *ev;
964   struct GNUNET_SET_IterResponseMessage *msg;
965
966   GNUNET_assert (NULL != set->iter);
967   do {
968     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
969                                                        NULL,
970                                                        (const void **) &ee);
971     if (GNUNET_NO == ret)
972     {
973       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974                   "Iteration on %p done.\n",
975                   set);
976       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
977       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
978       set->iter = NULL;
979       set->iteration_id++;
980       GNUNET_assert (set->content->iterator_count > 0);
981       set->content->iterator_count--;
982       execute_delayed_mutations (set);
983       GNUNET_MQ_send (set->cs->mq,
984                       ev);
985       return;
986     }
987     GNUNET_assert (NULL != ee);
988   } while (GNUNET_NO ==
989            is_element_of_generation (ee,
990                                      set->iter_generation,
991                                      set->excluded_generations,
992                                      set->excluded_generations_size));
993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994               "Sending iteration element on %p.\n",
995               set);
996   ev = GNUNET_MQ_msg_extra (msg,
997                             ee->element.size,
998                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
999   GNUNET_memcpy (&msg[1],
1000                  ee->element.data,
1001                  ee->element.size);
1002   msg->element_type = htons (ee->element.element_type);
1003   msg->iteration_id = htons (set->iteration_id);
1004   GNUNET_MQ_send (set->cs->mq,
1005                   ev);
1006 }
1007
1008
1009 /**
1010  * Called when a client wants to iterate the elements of a set.
1011  * Checks if we have a set associated with the client and if we
1012  * can right now start an iteration. If all checks out, starts
1013  * sending the elements of the set to the client.
1014  *
1015  * @param cls client that sent the message
1016  * @param m message sent by the client
1017  */
1018 static void
1019 handle_client_iterate (void *cls,
1020                        const struct GNUNET_MessageHeader *m)
1021 {
1022   struct ClientState *cs = cls;
1023   struct Set *set;
1024
1025   if (NULL == (set = cs->set))
1026   {
1027     /* attempt to iterate over a non existing set */
1028     GNUNET_break (0);
1029     GNUNET_SERVICE_client_drop (cs->client);
1030     return;
1031   }
1032   if (NULL != set->iter)
1033   {
1034     /* Only one concurrent iterate-action allowed per set */
1035     GNUNET_break (0);
1036     GNUNET_SERVICE_client_drop (cs->client);
1037     return;
1038   }
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040               "Iterating set %p in gen %u with %u content elements\n",
1041               (void *) set,
1042               set->current_generation,
1043               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1044   GNUNET_SERVICE_client_continue (cs->client);
1045   set->content->iterator_count++;
1046   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1047   set->iter_generation = set->current_generation;
1048   send_client_element (set);
1049 }
1050
1051
1052 /**
1053  * Called when a client wants to create a new set.  This is typically
1054  * the first request from a client, and includes the type of set
1055  * operation to be performed.
1056  *
1057  * @param cls client that sent the message
1058  * @param m message sent by the client
1059  */
1060 static void
1061 handle_client_create_set (void *cls,
1062                           const struct GNUNET_SET_CreateMessage *msg)
1063 {
1064   struct ClientState *cs = cls;
1065   struct Set *set;
1066
1067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068               "Client created new set (operation %u)\n",
1069               (uint32_t) ntohl (msg->operation));
1070   if (NULL != cs->set)
1071   {
1072     /* There can only be one set per client */
1073     GNUNET_break (0);
1074     GNUNET_SERVICE_client_drop (cs->client);
1075     return;
1076   }
1077   set = GNUNET_new (struct Set);
1078   switch (ntohl (msg->operation))
1079   {
1080   case GNUNET_SET_OPERATION_INTERSECTION:
1081     set->vt = _GSS_intersection_vt ();
1082     break;
1083   case GNUNET_SET_OPERATION_UNION:
1084     set->vt = _GSS_union_vt ();
1085     break;
1086   default:
1087     GNUNET_free (set);
1088     GNUNET_break (0);
1089     GNUNET_SERVICE_client_drop (cs->client);
1090     return;
1091   }
1092   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1093   set->state = set->vt->create ();
1094   if (NULL == set->state)
1095   {
1096     /* initialization failed (i.e. out of memory) */
1097     GNUNET_free (set);
1098     GNUNET_SERVICE_client_drop (cs->client);
1099     return;
1100   }
1101   set->content = GNUNET_new (struct SetContent);
1102   set->content->refcount = 1;
1103   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1104                                                                  GNUNET_YES);
1105   set->cs = cs;
1106   cs->set = set;
1107   GNUNET_SERVICE_client_continue (cs->client);
1108 }
1109
1110
1111 /**
1112  * Timeout happens iff:
1113  *  - we suggested an operation to our listener,
1114  *    but did not receive a response in time
1115  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1116  *
1117  * @param cls channel context
1118  * @param tc context information (why was this task triggered now)
1119  */
1120 static void
1121 incoming_timeout_cb (void *cls)
1122 {
1123   struct Operation *op = cls;
1124
1125   op->timeout_task = NULL;
1126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127               "Remote peer's incoming request timed out\n");
1128   incoming_destroy (op);
1129 }
1130
1131
1132 /**
1133  * Method called whenever another peer has added us to a channel the
1134  * other peer initiated.  Only called (once) upon reception of data
1135  * from a channel we listen on.
1136  *
1137  * The channel context represents the operation itself and gets added
1138  * to a DLL, from where it gets looked up when our local listener
1139  * client responds to a proposed/suggested operation or connects and
1140  * associates with this operation.
1141  *
1142  * @param cls closure
1143  * @param channel new handle to the channel
1144  * @param source peer that started the channel
1145  * @return initial channel context for the channel
1146  *         returns NULL on error
1147  */
1148 static void *
1149 channel_new_cb (void *cls,
1150                 struct GNUNET_CADET_Channel *channel,
1151                 const struct GNUNET_PeerIdentity *source)
1152 {
1153   struct Listener *listener = cls;
1154   struct Operation *op;
1155
1156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157               "New incoming channel\n");
1158   op = GNUNET_new (struct Operation);
1159   op->listener = listener;
1160   op->peer = *source;
1161   op->channel = channel;
1162   op->mq = GNUNET_CADET_get_mq (op->channel);
1163   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1164                                        UINT32_MAX);
1165   op->timeout_task
1166     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1167                                     &incoming_timeout_cb,
1168                                     op);
1169   GNUNET_CONTAINER_DLL_insert (listener->op_head,
1170                                listener->op_tail,
1171                                op);
1172   return op;
1173 }
1174
1175
1176 /**
1177  * Function called whenever a channel is destroyed.  Should clean up
1178  * any associated state.  It must NOT call
1179  * GNUNET_CADET_channel_destroy() on the channel.
1180  *
1181  * The peer_disconnect function is part of a a virtual table set initially either
1182  * when a peer creates a new channel with us, or once we create
1183  * a new channel ourselves (evaluate).
1184  *
1185  * Once we know the exact type of operation (union/intersection), the vt is
1186  * replaced with an operation specific instance (_GSS_[op]_vt).
1187  *
1188  * @param channel_ctx place where local state associated
1189  *                   with the channel is stored
1190  * @param channel connection to the other end (henceforth invalid)
1191  */
1192 static void
1193 channel_end_cb (void *channel_ctx,
1194                 const struct GNUNET_CADET_Channel *channel)
1195 {
1196   struct Operation *op = channel_ctx;
1197
1198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199               "channel_end_cb called\n");
1200   op->channel = NULL;
1201   if (NULL != op->listener)
1202     incoming_destroy (op);
1203   else if (NULL != op->set)
1204     op->set->vt->channel_death (op);
1205   else
1206     _GSS_operation_destroy (op,
1207                             GNUNET_YES);
1208   GNUNET_free (op);
1209 }
1210
1211
1212 /**
1213  * Function called whenever an MQ-channel's transmission window size changes.
1214  *
1215  * The first callback in an outgoing channel will be with a non-zero value
1216  * and will mean the channel is connected to the destination.
1217  *
1218  * For an incoming channel it will be called immediately after the
1219  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1220  *
1221  * @param cls Channel closure.
1222  * @param channel Connection to the other end (henceforth invalid).
1223  * @param window_size New window size. If the is more messages than buffer size
1224  *                    this value will be negative..
1225  */
1226 static void
1227 channel_window_cb (void *cls,
1228                    const struct GNUNET_CADET_Channel *channel,
1229                    int window_size)
1230 {
1231   /* FIXME: not implemented, we could do flow control here... */
1232 }
1233
1234
1235 /**
1236  * Called when a client wants to create a new listener.
1237  *
1238  * @param cls client that sent the message
1239  * @param msg message sent by the client
1240  */
1241 static void
1242 handle_client_listen (void *cls,
1243                       const struct GNUNET_SET_ListenMessage *msg)
1244 {
1245   struct ClientState *cs = cls;
1246   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1247     GNUNET_MQ_hd_var_size (incoming_msg,
1248                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1249                            struct OperationRequestMessage,
1250                            NULL),
1251     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1252                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1253                            struct IBFMessage,
1254                            NULL),
1255     GNUNET_MQ_hd_var_size (union_p2p_elements,
1256                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1257                            struct GNUNET_SET_ElementMessage,
1258                            NULL),
1259     GNUNET_MQ_hd_var_size (union_p2p_offer,
1260                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1261                            struct GNUNET_MessageHeader,
1262                            NULL),
1263     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1264                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1265                            struct InquiryMessage,
1266                            NULL),
1267     GNUNET_MQ_hd_var_size (union_p2p_demand,
1268                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1269                            struct GNUNET_MessageHeader,
1270                            NULL),
1271     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1272                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1273                              struct GNUNET_MessageHeader,
1274                              NULL),
1275     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1276                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1277                              struct GNUNET_MessageHeader,
1278                              NULL),
1279     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1280                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1281                              struct GNUNET_MessageHeader,
1282                              NULL),
1283     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1284                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1285                              struct GNUNET_MessageHeader,
1286                              NULL),
1287     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1288                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1289                            struct StrataEstimatorMessage,
1290                            NULL),
1291     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1292                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1293                            struct StrataEstimatorMessage,
1294                            NULL),
1295     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1296                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1297                            struct GNUNET_SET_ElementMessage,
1298                            NULL),
1299     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1300                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1301                              struct IntersectionElementInfoMessage,
1302                              NULL),
1303     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1304                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1305                            struct BFMessage,
1306                            NULL),
1307     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1308                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1309                              struct IntersectionDoneMessage,
1310                              NULL),
1311     GNUNET_MQ_handler_end ()
1312   };
1313   struct Listener *listener;
1314
1315   if (NULL != cs->listener)
1316   {
1317     /* max. one active listener per client! */
1318     GNUNET_break (0);
1319     GNUNET_SERVICE_client_drop (cs->client);
1320     return;
1321   }
1322   listener = GNUNET_new (struct Listener);
1323   listener->cs = cs;
1324   cs->listener = listener;
1325   listener->app_id = msg->app_id;
1326   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1327   GNUNET_CONTAINER_DLL_insert (listener_head,
1328                                listener_tail,
1329                                listener);
1330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331               "New listener created (op %u, port %s)\n",
1332               listener->operation,
1333               GNUNET_h2s (&listener->app_id));
1334   listener->open_port
1335     = GNUNET_CADET_open_port (cadet,
1336                               &msg->app_id,
1337                               &channel_new_cb,
1338                               listener,
1339                               &channel_window_cb,
1340                               &channel_end_cb,
1341                               cadet_handlers);
1342   GNUNET_SERVICE_client_continue (cs->client);
1343 }
1344
1345
1346 /**
1347  * Called when the listening client rejects an operation
1348  * request by another peer.
1349  *
1350  * @param cls client that sent the message
1351  * @param msg message sent by the client
1352  */
1353 static void
1354 handle_client_reject (void *cls,
1355                       const struct GNUNET_SET_RejectMessage *msg)
1356 {
1357   struct ClientState *cs = cls;
1358   struct Operation *op;
1359
1360   op = get_incoming (ntohl (msg->accept_reject_id));
1361   if (NULL == op)
1362   {
1363     /* no matching incoming operation for this reject;
1364        could be that the other peer already disconnected... */
1365     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1366                 "Client rejected unknown operation %u\n",
1367                 (unsigned int) ntohl (msg->accept_reject_id));
1368     GNUNET_SERVICE_client_continue (cs->client);
1369     return;
1370   }
1371   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372               "Peer request (op %u, app %s) rejected by client\n",
1373               op->listener->operation,
1374               GNUNET_h2s (&cs->listener->app_id));
1375   GNUNET_CADET_channel_destroy (op->channel);
1376   GNUNET_SERVICE_client_continue (cs->client);
1377 }
1378
1379
1380 /**
1381  * Called when a client wants to add or remove an element to a set it inhabits.
1382  *
1383  * @param cls client that sent the message
1384  * @param msg message sent by the client
1385  */
1386 static int
1387 check_client_mutation (void *cls,
1388                        const struct GNUNET_SET_ElementMessage *msg)
1389 {
1390   /* NOTE: Technically, we should probably check with the
1391      block library whether the element we are given is well-formed */
1392   return GNUNET_OK;
1393 }
1394
1395
1396 /**
1397  * Called when a client wants to add or remove an element to a set it inhabits.
1398  *
1399  * @param cls client that sent the message
1400  * @param msg message sent by the client
1401  */
1402 static void
1403 handle_client_mutation (void *cls,
1404                         const struct GNUNET_SET_ElementMessage *msg)
1405 {
1406   struct ClientState *cs = cls;
1407   struct Set *set;
1408
1409   if (NULL == (set = cs->set))
1410   {
1411     /* client without a set requested an operation */
1412     GNUNET_break (0);
1413     GNUNET_SERVICE_client_drop (cs->client);
1414     return;
1415   }
1416   GNUNET_SERVICE_client_continue (cs->client);
1417
1418   if (0 != set->content->iterator_count)
1419   {
1420     struct PendingMutation *pm;
1421
1422     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423                 "Scheduling mutation on set\n");
1424     pm = GNUNET_new (struct PendingMutation);
1425     pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1426     pm->set = set;
1427     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1428                                       set->content->pending_mutations_tail,
1429                                       pm);
1430     return;
1431   }
1432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1433               "Executing mutation on set\n");
1434   execute_mutation (set,
1435                     msg);
1436 }
1437
1438
1439 /**
1440  * Advance the current generation of a set,
1441  * adding exclusion ranges if necessary.
1442  *
1443  * @param set the set where we want to advance the generation
1444  */
1445 static void
1446 advance_generation (struct Set *set)
1447 {
1448   struct GenerationRange r;
1449
1450   if (set->current_generation == set->content->latest_generation)
1451   {
1452     set->content->latest_generation++;
1453     set->current_generation++;
1454     return;
1455   }
1456
1457   GNUNET_assert (set->current_generation < set->content->latest_generation);
1458
1459   r.start = set->current_generation + 1;
1460   r.end = set->content->latest_generation + 1;
1461   set->content->latest_generation = r.end;
1462   set->current_generation = r.end;
1463   GNUNET_array_append (set->excluded_generations,
1464                        set->excluded_generations_size,
1465                        r);
1466 }
1467
1468
1469 /**
1470  * Called when a client wants to initiate a set operation with another
1471  * peer.  Initiates the CADET connection to the listener and sends the
1472  * request.
1473  *
1474  * @param cls client that sent the message
1475  * @param msg message sent by the client
1476  * @return #GNUNET_OK if the message is well-formed
1477  */
1478 static int
1479 check_client_evaluate (void *cls,
1480                         const struct GNUNET_SET_EvaluateMessage *msg)
1481 {
1482   /* FIXME: suboptimal, even if the context below could be NULL,
1483      there are malformed messages this does not check for... */
1484   return GNUNET_OK;
1485 }
1486
1487
1488 /**
1489  * Called when a client wants to initiate a set operation with another
1490  * peer.  Initiates the CADET connection to the listener and sends the
1491  * request.
1492  *
1493  * @param cls client that sent the message
1494  * @param msg message sent by the client
1495  */
1496 static void
1497 handle_client_evaluate (void *cls,
1498                         const struct GNUNET_SET_EvaluateMessage *msg)
1499 {
1500   struct ClientState *cs = cls;
1501   struct Operation *op = GNUNET_new (struct Operation);
1502   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1503     GNUNET_MQ_hd_var_size (incoming_msg,
1504                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1505                            struct OperationRequestMessage,
1506                            op),
1507     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1508                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1509                            struct IBFMessage,
1510                            op),
1511     GNUNET_MQ_hd_var_size (union_p2p_elements,
1512                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1513                            struct GNUNET_SET_ElementMessage,
1514                            op),
1515     GNUNET_MQ_hd_var_size (union_p2p_offer,
1516                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1517                            struct GNUNET_MessageHeader,
1518                            op),
1519     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1520                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1521                            struct InquiryMessage,
1522                            op),
1523     GNUNET_MQ_hd_var_size (union_p2p_demand,
1524                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1525                            struct GNUNET_MessageHeader,
1526                            op),
1527     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1528                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1529                              struct GNUNET_MessageHeader,
1530                              op),
1531     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1532                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1533                              struct GNUNET_MessageHeader,
1534                              op),
1535     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1536                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1537                              struct GNUNET_MessageHeader,
1538                              op),
1539     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1540                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1541                              struct GNUNET_MessageHeader,
1542                              op),
1543     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1544                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1545                            struct StrataEstimatorMessage,
1546                            op),
1547     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1548                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1549                            struct StrataEstimatorMessage,
1550                            op),
1551     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1552                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1553                            struct GNUNET_SET_ElementMessage,
1554                            op),
1555     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1556                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1557                              struct IntersectionElementInfoMessage,
1558                              op),
1559     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1560                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1561                            struct BFMessage,
1562                            op),
1563     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1564                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1565                              struct IntersectionDoneMessage,
1566                              op),
1567     GNUNET_MQ_handler_end ()
1568   };
1569   struct Set *set;
1570   const struct GNUNET_MessageHeader *context;
1571
1572   if (NULL == (set = cs->set))
1573   {
1574     GNUNET_break (0);
1575     GNUNET_free (op);
1576     GNUNET_SERVICE_client_drop (cs->client);
1577     return;
1578   }
1579   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1580                                        UINT32_MAX);
1581   op->peer = msg->target_peer;
1582   op->result_mode = ntohl (msg->result_mode);
1583   op->client_request_id = ntohl (msg->request_id);
1584   op->byzantine = msg->byzantine;
1585   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1586   op->force_full = msg->force_full;
1587   op->force_delta = msg->force_delta;
1588   context = GNUNET_MQ_extract_nested_mh (msg);
1589
1590   /* Advance generation values, so that
1591      mutations won't interfer with the running operation. */
1592   op->set = set;
1593   op->generation_created = set->current_generation;
1594   advance_generation (set);
1595   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1596                                set->ops_tail,
1597                                op);
1598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599               "Creating new CADET channel to port %s for set operation type %u\n",
1600               GNUNET_h2s (&msg->app_id),
1601               set->operation);
1602   op->channel = GNUNET_CADET_channel_create (cadet,
1603                                              op,
1604                                              &msg->target_peer,
1605                                              &msg->app_id,
1606                                              GNUNET_CADET_OPTION_RELIABLE,
1607                                              &channel_window_cb,
1608                                              &channel_end_cb,
1609                                              cadet_handlers);
1610   op->mq = GNUNET_CADET_get_mq (op->channel);
1611   op->state = set->vt->evaluate (op,
1612                                  context);
1613   if (NULL == op->state)
1614   {
1615     GNUNET_break (0);
1616     GNUNET_SERVICE_client_drop (cs->client);
1617     return;
1618   }
1619   GNUNET_SERVICE_client_continue (cs->client);
1620 }
1621
1622
1623 /**
1624  * Handle an ack from a client, and send the next element. Note
1625  * that we only expect acks for set elements, not after the
1626  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1627  *
1628  * @param cls client the client
1629  * @param ack the message
1630  */
1631 static void
1632 handle_client_iter_ack (void *cls,
1633                         const struct GNUNET_SET_IterAckMessage *ack)
1634 {
1635   struct ClientState *cs = cls;
1636   struct Set *set;
1637
1638   if (NULL == (set = cs->set))
1639   {
1640     /* client without a set acknowledged receiving a value */
1641     GNUNET_break (0);
1642     GNUNET_SERVICE_client_drop (cs->client);
1643     return;
1644   }
1645   if (NULL == set->iter)
1646   {
1647     /* client sent an ack, but we were not expecting one (as
1648        set iteration has finished) */
1649     GNUNET_break (0);
1650     GNUNET_SERVICE_client_drop (cs->client);
1651     return;
1652   }
1653   GNUNET_SERVICE_client_continue (cs->client);
1654   if (ntohl (ack->send_more))
1655   {
1656     send_client_element (set);
1657   }
1658   else
1659   {
1660     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1661     set->iter = NULL;
1662     set->iteration_id++;
1663   }
1664 }
1665
1666
1667 /**
1668  * Handle a request from the client to copy a set.
1669  *
1670  * @param cls the client
1671  * @param mh the message
1672  */
1673 static void
1674 handle_client_copy_lazy_prepare (void *cls,
1675                                  const struct GNUNET_MessageHeader *mh)
1676 {
1677   struct ClientState *cs = cls;
1678   struct Set *set;
1679   struct LazyCopyRequest *cr;
1680   struct GNUNET_MQ_Envelope *ev;
1681   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1682
1683   if (NULL == (set = cs->set))
1684   {
1685     /* client without a set requested an operation */
1686     GNUNET_break (0);
1687     GNUNET_SERVICE_client_drop (cs->client);
1688     return;
1689   }
1690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1691               "Client requested creation of lazy copy\n");
1692   cr = GNUNET_new (struct LazyCopyRequest);
1693   cr->cookie = ++lazy_copy_cookie;
1694   cr->source_set = set;
1695   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1696                                lazy_copy_tail,
1697                                cr);
1698   ev = GNUNET_MQ_msg (resp_msg,
1699                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1700   resp_msg->cookie = cr->cookie;
1701   GNUNET_MQ_send (set->cs->mq,
1702                   ev);
1703   GNUNET_SERVICE_client_continue (cs->client);
1704 }
1705
1706
1707 /**
1708  * Handle a request from the client to connect to a copy of a set.
1709  *
1710  * @param cls the client
1711  * @param msg the message
1712  */
1713 static void
1714 handle_client_copy_lazy_connect (void *cls,
1715                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1716 {
1717   struct ClientState *cs = cls;
1718   struct LazyCopyRequest *cr;
1719   struct Set *set;
1720   int found;
1721
1722   if (NULL != cs->set)
1723   {
1724     /* There can only be one set per client */
1725     GNUNET_break (0);
1726     GNUNET_SERVICE_client_drop (cs->client);
1727     return;
1728   }
1729   found = GNUNET_NO;
1730   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1731   {
1732     if (cr->cookie == msg->cookie)
1733     {
1734       found = GNUNET_YES;
1735       break;
1736     }
1737   }
1738   if (GNUNET_NO == found)
1739   {
1740     /* client asked for copy with cookie we don't know */
1741     GNUNET_break (0);
1742     GNUNET_SERVICE_client_drop (cs->client);
1743     return;
1744   }
1745   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1746                                lazy_copy_tail,
1747                                cr);
1748   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1749               "Client %p requested use of lazy copy\n",
1750               cs);
1751   set = GNUNET_new (struct Set);
1752   switch (cr->source_set->operation)
1753   {
1754   case GNUNET_SET_OPERATION_INTERSECTION:
1755     set->vt = _GSS_intersection_vt ();
1756     break;
1757   case GNUNET_SET_OPERATION_UNION:
1758     set->vt = _GSS_union_vt ();
1759     break;
1760   default:
1761     GNUNET_assert (0);
1762     return;
1763   }
1764
1765   if (NULL == set->vt->copy_state)
1766   {
1767     /* Lazy copy not supported for this set operation */
1768     GNUNET_break (0);
1769     GNUNET_free (set);
1770     GNUNET_free (cr);
1771     GNUNET_SERVICE_client_drop (cs->client);
1772     return;
1773   }
1774
1775   set->operation = cr->source_set->operation;
1776   set->state = set->vt->copy_state (cr->source_set->state);
1777   set->content = cr->source_set->content;
1778   set->content->refcount++;
1779
1780   set->current_generation = cr->source_set->current_generation;
1781   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1782   set->excluded_generations
1783     = GNUNET_memdup (cr->source_set->excluded_generations,
1784                      set->excluded_generations_size * sizeof (struct GenerationRange));
1785
1786   /* Advance the generation of the new set, so that mutations to the
1787      of the cloned set and the source set are independent. */
1788   advance_generation (set);
1789   set->cs = cs;
1790   cs->set = set;
1791   GNUNET_free (cr);
1792   GNUNET_SERVICE_client_continue (cs->client);
1793 }
1794
1795
1796 /**
1797  * Handle a request from the client to cancel a running set operation.
1798  *
1799  * @param cls the client
1800  * @param msg the message
1801  */
1802 static void
1803 handle_client_cancel (void *cls,
1804                       const struct GNUNET_SET_CancelMessage *msg)
1805 {
1806   struct ClientState *cs = cls;
1807   struct Set *set;
1808   struct Operation *op;
1809   int found;
1810
1811   if (NULL == (set = cs->set))
1812   {
1813     /* client without a set requested an operation */
1814     GNUNET_break (0);
1815     GNUNET_SERVICE_client_drop (cs->client);
1816     return;
1817   }
1818   found = GNUNET_NO;
1819   for (op = set->ops_head; NULL != op; op = op->next)
1820   {
1821     if (op->client_request_id == ntohl (msg->request_id))
1822     {
1823       found = GNUNET_YES;
1824       break;
1825     }
1826   }
1827   if (GNUNET_NO == found)
1828   {
1829     /* It may happen that the operation was already destroyed due to
1830      * the other peer disconnecting.  The client may not know about this
1831      * yet and try to cancel the (just barely non-existent) operation.
1832      * So this is not a hard error.
1833      */
1834     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1835                 "Client canceled non-existent op %u\n",
1836                 (uint32_t) ntohl (msg->request_id));
1837   }
1838   else
1839   {
1840     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1841                 "Client requested cancel for op %u\n",
1842                 (uint32_t) ntohl (msg->request_id));
1843     _GSS_operation_destroy (op,
1844                             GNUNET_YES);
1845   }
1846   GNUNET_SERVICE_client_continue (cs->client);
1847 }
1848
1849
1850 /**
1851  * Handle a request from the client to accept a set operation that
1852  * came from a remote peer.  We forward the accept to the associated
1853  * operation for handling
1854  *
1855  * @param cls the client
1856  * @param msg the message
1857  */
1858 static void
1859 handle_client_accept (void *cls,
1860                       const struct GNUNET_SET_AcceptMessage *msg)
1861 {
1862   struct ClientState *cs = cls;
1863   struct Set *set;
1864   struct Operation *op;
1865   struct GNUNET_SET_ResultMessage *result_message;
1866   struct GNUNET_MQ_Envelope *ev;
1867   struct Listener *listener;
1868
1869   if (NULL == (set = cs->set))
1870   {
1871     /* client without a set requested to accept */
1872     GNUNET_break (0);
1873     GNUNET_SERVICE_client_drop (cs->client);
1874     return;
1875   }
1876   op = get_incoming (ntohl (msg->accept_reject_id));
1877   if (NULL == op)
1878   {
1879     /* It is not an error if the set op does not exist -- it may
1880      * have been destroyed when the partner peer disconnected. */
1881     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1882                 "Client %p accepted request %u of listener %p that is no longer active\n",
1883                 cs,
1884                 ntohl (msg->accept_reject_id),
1885                 cs->listener);
1886     ev = GNUNET_MQ_msg (result_message,
1887                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1888     result_message->request_id = msg->request_id;
1889     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1890     GNUNET_MQ_send (set->cs->mq,
1891                     ev);
1892     GNUNET_SERVICE_client_continue (cs->client);
1893     return;
1894   }
1895   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1896               "Client accepting request %u\n",
1897               (uint32_t) ntohl (msg->accept_reject_id));
1898   listener = op->listener;
1899   op->listener = NULL;
1900   GNUNET_CONTAINER_DLL_remove (listener->op_head,
1901                                listener->op_tail,
1902                                op);
1903   op->set = set;
1904   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1905                                set->ops_tail,
1906                                op);
1907   op->client_request_id = ntohl (msg->request_id);
1908   op->result_mode = ntohl (msg->result_mode);
1909   op->byzantine = msg->byzantine;
1910   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1911   op->force_full = msg->force_full;
1912   op->force_delta = msg->force_delta;
1913
1914   /* Advance generation values, so that future mutations do not
1915      interfer with the running operation. */
1916   op->generation_created = set->current_generation;
1917   advance_generation (set);
1918   GNUNET_assert (NULL == op->state);
1919   op->state = set->vt->accept (op);
1920   if (NULL == op->state)
1921   {
1922     GNUNET_break (0);
1923     GNUNET_SERVICE_client_drop (cs->client);
1924     return;
1925   }
1926   /* Now allow CADET to continue, as we did not do this in
1927      #handle_incoming_msg (as we wanted to first see if the
1928      local client would accept the request). */
1929   GNUNET_CADET_receive_done (op->channel);
1930   GNUNET_SERVICE_client_continue (cs->client);
1931 }
1932
1933
1934 /**
1935  * Called to clean up, after a shutdown has been requested.
1936  *
1937  * @param cls closure, NULL
1938  */
1939 static void
1940 shutdown_task (void *cls)
1941 {
1942   /* Delay actual shutdown to allow service to disconnect clients */
1943   in_shutdown = GNUNET_YES;
1944   if (0 == num_clients)
1945   {
1946     if (NULL != cadet)
1947     {
1948       GNUNET_CADET_disconnect (cadet);
1949       cadet = NULL;
1950     }
1951   }
1952   GNUNET_STATISTICS_destroy (_GSS_statistics,
1953                              GNUNET_YES);
1954   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955               "handled shutdown request\n");
1956 }
1957
1958
1959 /**
1960  * Function called by the service's run
1961  * method to run service-specific setup code.
1962  *
1963  * @param cls closure
1964  * @param cfg configuration to use
1965  * @param service the initialized service
1966  */
1967 static void
1968 run (void *cls,
1969      const struct GNUNET_CONFIGURATION_Handle *cfg,
1970      struct GNUNET_SERVICE_Handle *service)
1971 {
1972   /* FIXME: need to modify SERVICE (!) API to allow
1973      us to run a shutdown task *after* clients were
1974      forcefully disconnected! */
1975   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1976                                  NULL);
1977   _GSS_statistics = GNUNET_STATISTICS_create ("set",
1978                                               cfg);
1979   cadet = GNUNET_CADET_connect (cfg);
1980   if (NULL == cadet)
1981   {
1982     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1983                 _("Could not connect to CADET service\n"));
1984     GNUNET_SCHEDULER_shutdown ();
1985     return;
1986   }
1987 }
1988
1989
1990 /**
1991  * Define "main" method using service macro.
1992  */
1993 GNUNET_SERVICE_MAIN
1994 ("set",
1995  GNUNET_SERVICE_OPTION_NONE,
1996  &run,
1997  &client_connect_cb,
1998  &client_disconnect_cb,
1999  NULL,
2000  GNUNET_MQ_hd_fixed_size (client_accept,
2001                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2002                           struct GNUNET_SET_AcceptMessage,
2003                           NULL),
2004  GNUNET_MQ_hd_fixed_size (client_iter_ack,
2005                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2006                           struct GNUNET_SET_IterAckMessage,
2007                           NULL),
2008  GNUNET_MQ_hd_var_size (client_mutation,
2009                         GNUNET_MESSAGE_TYPE_SET_ADD,
2010                         struct GNUNET_SET_ElementMessage,
2011                         NULL),
2012  GNUNET_MQ_hd_fixed_size (client_create_set,
2013                           GNUNET_MESSAGE_TYPE_SET_CREATE,
2014                           struct GNUNET_SET_CreateMessage,
2015                           NULL),
2016  GNUNET_MQ_hd_fixed_size (client_iterate,
2017                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2018                           struct GNUNET_MessageHeader,
2019                           NULL),
2020  GNUNET_MQ_hd_var_size (client_evaluate,
2021                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2022                         struct GNUNET_SET_EvaluateMessage,
2023                         NULL),
2024  GNUNET_MQ_hd_fixed_size (client_listen,
2025                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
2026                           struct GNUNET_SET_ListenMessage,
2027                           NULL),
2028  GNUNET_MQ_hd_fixed_size (client_reject,
2029                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2030                           struct GNUNET_SET_RejectMessage,
2031                           NULL),
2032  GNUNET_MQ_hd_var_size (client_mutation,
2033                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2034                         struct GNUNET_SET_ElementMessage,
2035                         NULL),
2036  GNUNET_MQ_hd_fixed_size (client_cancel,
2037                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2038                           struct GNUNET_SET_CancelMessage,
2039                           NULL),
2040  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2041                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2042                           struct GNUNET_MessageHeader,
2043                           NULL),
2044  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2045                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2046                           struct GNUNET_SET_CopyLazyConnectMessage,
2047                           NULL),
2048  GNUNET_MQ_handler_end ());
2049
2050
2051 /* end of gnunet-service-set.c */