remove CYGWIN codeblocks, drop vendored Windows openvpn, drop win32 specific files.
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
31
32 /**
33  * How long do we hold on to an incoming channel if there is
34  * no local listener before giving up?
35  */
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38
39 /**
40  * Lazy copy requests made by a client.
41  */
42 struct LazyCopyRequest {
43   /**
44    * Kept in a DLL.
45    */
46   struct LazyCopyRequest *prev;
47
48   /**
49    * Kept in a DLL.
50    */
51   struct LazyCopyRequest *next;
52
53   /**
54    * Which set are we supposed to copy?
55    */
56   struct Set *source_set;
57
58   /**
59    * Cookie identifying the request.
60    */
61   uint32_t cookie;
62 };
63
64
65 /**
66  * A listener is inhabited by a client, and waits for evaluation
67  * requests from remote peers.
68  */
69 struct Listener {
70   /**
71    * Listeners are held in a doubly linked list.
72    */
73   struct Listener *next;
74
75   /**
76    * Listeners are held in a doubly linked list.
77    */
78   struct Listener *prev;
79
80   /**
81    * Head of DLL of operations this listener is responsible for.
82    * Once the client has accepted/declined the operation, the
83    * operation is moved to the respective set's operation DLLS.
84    */
85   struct Operation *op_head;
86
87   /**
88    * Tail of DLL of operations this listener is responsible for.
89    * Once the client has accepted/declined the operation, the
90    * operation is moved to the respective set's operation DLLS.
91    */
92   struct Operation *op_tail;
93
94   /**
95    * Client that owns the listener.
96    * Only one client may own a listener.
97    */
98   struct ClientState *cs;
99
100   /**
101    * The port we are listening on with CADET.
102    */
103   struct GNUNET_CADET_Port *open_port;
104
105   /**
106    * Application ID for the operation, used to distinguish
107    * multiple operations of the same type with the same peer.
108    */
109   struct GNUNET_HashCode app_id;
110
111   /**
112    * The type of the operation.
113    */
114   enum GNUNET_SET_OperationType operation;
115 };
116
117
118 /**
119  * Handle to the cadet service, used to listen for and connect to
120  * remote peers.
121  */
122 static struct GNUNET_CADET_Handle *cadet;
123
124 /**
125  * DLL of lazy copy requests by this client.
126  */
127 static struct LazyCopyRequest *lazy_copy_head;
128
129 /**
130  * DLL of lazy copy requests by this client.
131  */
132 static struct LazyCopyRequest *lazy_copy_tail;
133
134 /**
135  * Generator for unique cookie we set per lazy copy request.
136  */
137 static uint32_t lazy_copy_cookie;
138
139 /**
140  * Statistics handle.
141  */
142 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
143
144 /**
145  * Listeners are held in a doubly linked list.
146  */
147 static struct Listener *listener_head;
148
149 /**
150  * Listeners are held in a doubly linked list.
151  */
152 static struct Listener *listener_tail;
153
154 /**
155  * Number of active clients.
156  */
157 static unsigned int num_clients;
158
159 /**
160  * Are we in shutdown? if #GNUNET_YES and the number of clients
161  * drops to zero, disconnect from CADET.
162  */
163 static int in_shutdown;
164
165 /**
166  * Counter for allocating unique IDs for clients, used to identify
167  * incoming operation requests from remote peers, that the client can
168  * choose to accept or refuse.  0 must not be used (reserved for
169  * uninitialized).
170  */
171 static uint32_t suggest_id;
172
173
174 /**
175  * Get the incoming socket associated with the given id.
176  *
177  * @param listener the listener to look in
178  * @param id id to look for
179  * @return the incoming socket associated with the id,
180  *         or NULL if there is none
181  */
182 static struct Operation *
183 get_incoming(uint32_t id)
184 {
185   for (struct Listener *listener = listener_head; NULL != listener;
186        listener = listener->next)
187     {
188       for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
189         if (op->suggest_id == id)
190           return op;
191     }
192   return NULL;
193 }
194
195
196 /**
197  * Destroy an incoming request from a remote peer
198  *
199  * @param op remote request to destroy
200  */
201 static void
202 incoming_destroy(struct Operation *op)
203 {
204   struct Listener *listener;
205
206   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
207              "Destroying incoming operation %p\n",
208              op);
209   if (NULL != (listener = op->listener))
210     {
211       GNUNET_CONTAINER_DLL_remove(listener->op_head, listener->op_tail, op);
212       op->listener = NULL;
213     }
214   if (NULL != op->timeout_task)
215     {
216       GNUNET_SCHEDULER_cancel(op->timeout_task);
217       op->timeout_task = NULL;
218     }
219   _GSS_operation_destroy2(op);
220 }
221
222
223 /**
224  * Context for the #garbage_collect_cb().
225  */
226 struct GarbageContext {
227   /**
228    * Map for which we are garbage collecting removed elements.
229    */
230   struct GNUNET_CONTAINER_MultiHashMap *map;
231
232   /**
233    * Lowest generation for which an operation is still pending.
234    */
235   unsigned int min_op_generation;
236
237   /**
238    * Largest generation for which an operation is still pending.
239    */
240   unsigned int max_op_generation;
241 };
242
243
244 /**
245  * Function invoked to check if an element can be removed from
246  * the set's history because it is no longer needed.
247  *
248  * @param cls the `struct GarbageContext *`
249  * @param key key of the element in the map
250  * @param value the `struct ElementEntry *`
251  * @return #GNUNET_OK (continue to iterate)
252  */
253 static int
254 garbage_collect_cb(void *cls, const struct GNUNET_HashCode *key, void *value)
255 {
256   //struct GarbageContext *gc = cls;
257   //struct ElementEntry *ee = value;
258
259   //if (GNUNET_YES != ee->removed)
260   //  return GNUNET_OK;
261   //if ( (gc->max_op_generation < ee->generation_added) ||
262   //     (ee->generation_removed > gc->min_op_generation) )
263   //{
264   //  GNUNET_assert (GNUNET_YES ==
265   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
266   //                                                       key,
267   //                                                       ee));
268   //  GNUNET_free (ee);
269   //}
270   return GNUNET_OK;
271 }
272
273
274 /**
275  * Collect and destroy elements that are not needed anymore, because
276  * their lifetime (as determined by their generation) does not overlap
277  * with any active set operation.
278  *
279  * @param set set to garbage collect
280  */
281 static void
282 collect_generation_garbage(struct Set *set)
283 {
284   struct GarbageContext gc;
285
286   gc.min_op_generation = UINT_MAX;
287   gc.max_op_generation = 0;
288   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
289     {
290       gc.min_op_generation =
291         GNUNET_MIN(gc.min_op_generation, op->generation_created);
292       gc.max_op_generation =
293         GNUNET_MAX(gc.max_op_generation, op->generation_created);
294     }
295   gc.map = set->content->elements;
296   GNUNET_CONTAINER_multihashmap_iterate(set->content->elements,
297                                         &garbage_collect_cb,
298                                         &gc);
299 }
300
301
302 /**
303  * Is @a generation in the range of exclusions?
304  *
305  * @param generation generation to query
306  * @param excluded array of generations where the element is excluded
307  * @param excluded_size length of the @a excluded array
308  * @return #GNUNET_YES if @a generation is in any of the ranges
309  */
310 static int
311 is_excluded_generation(unsigned int generation,
312                        struct GenerationRange *excluded,
313                        unsigned int excluded_size)
314 {
315   for (unsigned int i = 0; i < excluded_size; i++)
316     if ((generation >= excluded[i].start) && (generation < excluded[i].end))
317       return GNUNET_YES;
318   return GNUNET_NO;
319 }
320
321
322 /**
323  * Is element @a ee part of the set during @a query_generation?
324  *
325  * @param ee element to test
326  * @param query_generation generation to query
327  * @param excluded array of generations where the element is excluded
328  * @param excluded_size length of the @a excluded array
329  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
330  */
331 static int
332 is_element_of_generation(struct ElementEntry *ee,
333                          unsigned int query_generation,
334                          struct GenerationRange *excluded,
335                          unsigned int excluded_size)
336 {
337   struct MutationEvent *mut;
338   int is_present;
339
340   GNUNET_assert(NULL != ee->mutations);
341   if (GNUNET_YES ==
342       is_excluded_generation(query_generation, excluded, excluded_size))
343     {
344       GNUNET_break(0);
345       return GNUNET_NO;
346     }
347
348   is_present = GNUNET_NO;
349
350   /* Could be made faster with binary search, but lists
351      are small, so why bother. */
352   for (unsigned int i = 0; i < ee->mutations_size; i++)
353     {
354       mut = &ee->mutations[i];
355
356       if (mut->generation > query_generation)
357         {
358           /* The mutation doesn't apply to our generation
359              anymore.  We can'b break here, since mutations aren't
360              sorted by generation. */
361           continue;
362         }
363
364       if (GNUNET_YES ==
365           is_excluded_generation(mut->generation, excluded, excluded_size))
366         {
367           /* The generation is excluded (because it belongs to another
368              fork via a lazy copy) and thus mutations aren't considered
369              for membership testing. */
370           continue;
371         }
372
373       /* This would be an inconsistency in how we manage mutations. */
374       if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
375         GNUNET_assert(0);
376       /* Likewise. */
377       if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
378         GNUNET_assert(0);
379
380       is_present = mut->added;
381     }
382
383   return is_present;
384 }
385
386
387 /**
388  * Is element @a ee part of the set used by @a op?
389  *
390  * @param ee element to test
391  * @param op operation the defines the set and its generation
392  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
393  */
394 int
395 _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
396 {
397   return is_element_of_generation(ee,
398                                   op->generation_created,
399                                   op->set->excluded_generations,
400                                   op->set->excluded_generations_size);
401 }
402
403
404 /**
405  * Destroy the given operation.  Used for any operation where both
406  * peers were known and that thus actually had a vt and channel.  Must
407  * not be used for operations where 'listener' is still set and we do
408  * not know the other peer.
409  *
410  * Call the implementation-specific cancel function of the operation.
411  * Disconnects from the remote peer.  Does not disconnect the client,
412  * as there may be multiple operations per set.
413  *
414  * @param op operation to destroy
415  * @param gc #GNUNET_YES to perform garbage collection on the set
416  */
417 void
418 _GSS_operation_destroy(struct Operation *op, int gc)
419 {
420   struct Set *set = op->set;
421   struct GNUNET_CADET_Channel *channel;
422
423   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
424   GNUNET_assert(NULL == op->listener);
425   if (NULL != op->state)
426     {
427       set->vt->cancel(op);
428       op->state = NULL;
429     }
430   if (NULL != set)
431     {
432       GNUNET_CONTAINER_DLL_remove(set->ops_head, set->ops_tail, op);
433       op->set = NULL;
434     }
435   if (NULL != op->context_msg)
436     {
437       GNUNET_free(op->context_msg);
438       op->context_msg = NULL;
439     }
440   if (NULL != (channel = op->channel))
441     {
442       /* This will free op; called conditionally as this helper function
443          is also called from within the channel disconnect handler. */
444       op->channel = NULL;
445       GNUNET_CADET_channel_destroy(channel);
446     }
447   if ((NULL != set) && (GNUNET_YES == gc))
448     collect_generation_garbage(set);
449   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
450    * there was a channel end handler that will free 'op' on the call stack. */
451 }
452
453
454 /**
455  * Callback called when a client connects to the service.
456  *
457  * @param cls closure for the service
458  * @param c the new client that connected to the service
459  * @param mq the message queue used to send messages to the client
460  * @return @a `struct ClientState`
461  */
462 static void *
463 client_connect_cb(void *cls,
464                   struct GNUNET_SERVICE_Client *c,
465                   struct GNUNET_MQ_Handle *mq)
466 {
467   struct ClientState *cs;
468
469   num_clients++;
470   cs = GNUNET_new(struct ClientState);
471   cs->client = c;
472   cs->mq = mq;
473   return cs;
474 }
475
476
477 /**
478  * Iterator over hash map entries to free element entries.
479  *
480  * @param cls closure
481  * @param key current key code
482  * @param value a `struct ElementEntry *` to be free'd
483  * @return #GNUNET_YES (continue to iterate)
484  */
485 static int
486 destroy_elements_iterator(void *cls,
487                           const struct GNUNET_HashCode *key,
488                           void *value)
489 {
490   struct ElementEntry *ee = value;
491
492   GNUNET_free_non_null(ee->mutations);
493   GNUNET_free(ee);
494   return GNUNET_YES;
495 }
496
497
498 /**
499  * Clean up after a client has disconnected
500  *
501  * @param cls closure, unused
502  * @param client the client to clean up after
503  * @param internal_cls the `struct ClientState`
504  */
505 static void
506 client_disconnect_cb(void *cls,
507                      struct GNUNET_SERVICE_Client *client,
508                      void *internal_cls)
509 {
510   struct ClientState *cs = internal_cls;
511   struct Operation *op;
512   struct Listener *listener;
513   struct Set *set;
514
515   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
516   if (NULL != (set = cs->set))
517     {
518       struct SetContent *content = set->content;
519       struct PendingMutation *pm;
520       struct PendingMutation *pm_current;
521       struct LazyCopyRequest *lcr;
522
523       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
524       /* Destroy pending set operations */
525       while (NULL != set->ops_head)
526         _GSS_operation_destroy(set->ops_head, GNUNET_NO);
527
528       /* Destroy operation-specific state */
529       GNUNET_assert(NULL != set->state);
530       set->vt->destroy_set(set->state);
531       set->state = NULL;
532
533       /* Clean up ongoing iterations */
534       if (NULL != set->iter)
535         {
536           GNUNET_CONTAINER_multihashmap_iterator_destroy(set->iter);
537           set->iter = NULL;
538           set->iteration_id++;
539         }
540
541       /* discard any pending mutations that reference this set */
542       pm = content->pending_mutations_head;
543       while (NULL != pm)
544         {
545           pm_current = pm;
546           pm = pm->next;
547           if (pm_current->set == set)
548             {
549               GNUNET_CONTAINER_DLL_remove(content->pending_mutations_head,
550                                           content->pending_mutations_tail,
551                                           pm_current);
552               GNUNET_free(pm_current);
553             }
554         }
555
556       /* free set content (or at least decrement RC) */
557       set->content = NULL;
558       GNUNET_assert(0 != content->refcount);
559       content->refcount--;
560       if (0 == content->refcount)
561         {
562           GNUNET_assert(NULL != content->elements);
563           GNUNET_CONTAINER_multihashmap_iterate(content->elements,
564                                                 &destroy_elements_iterator,
565                                                 NULL);
566           GNUNET_CONTAINER_multihashmap_destroy(content->elements);
567           content->elements = NULL;
568           GNUNET_free(content);
569         }
570       GNUNET_free_non_null(set->excluded_generations);
571       set->excluded_generations = NULL;
572
573       /* remove set from pending copy requests */
574       lcr = lazy_copy_head;
575       while (NULL != lcr)
576         {
577           struct LazyCopyRequest *lcr_current = lcr;
578
579           lcr = lcr->next;
580           if (lcr_current->source_set == set)
581             {
582               GNUNET_CONTAINER_DLL_remove(lazy_copy_head,
583                                           lazy_copy_tail,
584                                           lcr_current);
585               GNUNET_free(lcr_current);
586             }
587         }
588       GNUNET_free(set);
589     }
590
591   if (NULL != (listener = cs->listener))
592     {
593       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
594       GNUNET_CADET_close_port(listener->open_port);
595       listener->open_port = NULL;
596       while (NULL != (op = listener->op_head))
597         {
598           GNUNET_log(GNUNET_ERROR_TYPE_INFO,
599                      "Destroying incoming operation `%u' from peer `%s'\n",
600                      (unsigned int)op->client_request_id,
601                      GNUNET_i2s(&op->peer));
602           incoming_destroy(op);
603         }
604       GNUNET_CONTAINER_DLL_remove(listener_head, listener_tail, listener);
605       GNUNET_free(listener);
606     }
607   GNUNET_free(cs);
608   num_clients--;
609   if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
610     {
611       if (NULL != cadet)
612         {
613           GNUNET_CADET_disconnect(cadet);
614           cadet = NULL;
615         }
616     }
617 }
618
619
620 /**
621  * Check a request for a set operation from another peer.
622  *
623  * @param cls the operation state
624  * @param msg the received message
625  * @return #GNUNET_OK if the channel should be kept alive,
626  *         #GNUNET_SYSERR to destroy the channel
627  */
628 static int
629 check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
630 {
631   struct Operation *op = cls;
632   struct Listener *listener = op->listener;
633   const struct GNUNET_MessageHeader *nested_context;
634
635   /* double operation request */
636   if (0 != op->suggest_id)
637     {
638       GNUNET_break_op(0);
639       return GNUNET_SYSERR;
640     }
641   /* This should be equivalent to the previous condition, but can't hurt to check twice */
642   if (NULL == op->listener)
643     {
644       GNUNET_break(0);
645       return GNUNET_SYSERR;
646     }
647   if (listener->operation !=
648       (enum GNUNET_SET_OperationType)ntohl(msg->operation))
649     {
650       GNUNET_break_op(0);
651       return GNUNET_SYSERR;
652     }
653   nested_context = GNUNET_MQ_extract_nested_mh(msg);
654   if ((NULL != nested_context) &&
655       (ntohs(nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
656     {
657       GNUNET_break_op(0);
658       return GNUNET_SYSERR;
659     }
660   return GNUNET_OK;
661 }
662
663
664 /**
665  * Handle a request for a set operation from another peer.  Checks if we
666  * have a listener waiting for such a request (and in that case initiates
667  * asking the listener about accepting the connection). If no listener
668  * is waiting, we queue the operation request in hope that a listener
669  * shows up soon (before timeout).
670  *
671  * This msg is expected as the first and only msg handled through the
672  * non-operation bound virtual table, acceptance of this operation replaces
673  * our virtual table and subsequent msgs would be routed differently (as
674  * we then know what type of operation this is).
675  *
676  * @param cls the operation state
677  * @param msg the received message
678  * @return #GNUNET_OK if the channel should be kept alive,
679  *         #GNUNET_SYSERR to destroy the channel
680  */
681 static void
682 handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
683 {
684   struct Operation *op = cls;
685   struct Listener *listener = op->listener;
686   const struct GNUNET_MessageHeader *nested_context;
687   struct GNUNET_MQ_Envelope *env;
688   struct GNUNET_SET_RequestMessage *cmsg;
689
690   nested_context = GNUNET_MQ_extract_nested_mh(msg);
691   /* Make a copy of the nested_context (application-specific context
692      information that is opaque to set) so we can pass it to the
693      listener later on */
694   if (NULL != nested_context)
695     op->context_msg = GNUNET_copy_message(nested_context);
696   op->remote_element_count = ntohl(msg->element_count);
697   GNUNET_log(
698     GNUNET_ERROR_TYPE_DEBUG,
699     "Received P2P operation request (op %u, port %s) for active listener\n",
700     (uint32_t)ntohl(msg->operation),
701     GNUNET_h2s(&op->listener->app_id));
702   GNUNET_assert(0 == op->suggest_id);
703   if (0 == suggest_id)
704     suggest_id++;
705   op->suggest_id = suggest_id++;
706   GNUNET_assert(NULL != op->timeout_task);
707   GNUNET_SCHEDULER_cancel(op->timeout_task);
708   op->timeout_task = NULL;
709   env = GNUNET_MQ_msg_nested_mh(cmsg,
710                                 GNUNET_MESSAGE_TYPE_SET_REQUEST,
711                                 op->context_msg);
712   GNUNET_log(
713     GNUNET_ERROR_TYPE_DEBUG,
714     "Suggesting incoming request with accept id %u to listener %p of client %p\n",
715     op->suggest_id,
716     listener,
717     listener->cs);
718   cmsg->accept_id = htonl(op->suggest_id);
719   cmsg->peer_id = op->peer;
720   GNUNET_MQ_send(listener->cs->mq, env);
721   /* NOTE: GNUNET_CADET_receive_done() will be called in
722    #handle_client_accept() */
723 }
724
725
726 /**
727  * Add an element to @a set as specified by @a msg
728  *
729  * @param set set to manipulate
730  * @param msg message specifying the change
731  */
732 static void
733 execute_add(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
734 {
735   struct GNUNET_SET_Element el;
736   struct ElementEntry *ee;
737   struct GNUNET_HashCode hash;
738
739   GNUNET_assert(GNUNET_MESSAGE_TYPE_SET_ADD == ntohs(msg->header.type));
740   el.size = ntohs(msg->header.size) - sizeof(*msg);
741   el.data = &msg[1];
742   el.element_type = ntohs(msg->element_type);
743   GNUNET_SET_element_hash(&el, &hash);
744   ee = GNUNET_CONTAINER_multihashmap_get(set->content->elements, &hash);
745   if (NULL == ee)
746     {
747       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
748                  "Client inserts element %s of size %u\n",
749                  GNUNET_h2s(&hash),
750                  el.size);
751       ee = GNUNET_malloc(el.size + sizeof(*ee));
752       ee->element.size = el.size;
753       GNUNET_memcpy(&ee[1], el.data, el.size);
754       ee->element.data = &ee[1];
755       ee->element.element_type = el.element_type;
756       ee->remote = GNUNET_NO;
757       ee->mutations = NULL;
758       ee->mutations_size = 0;
759       ee->element_hash = hash;
760       GNUNET_break(GNUNET_YES ==
761                    GNUNET_CONTAINER_multihashmap_put(
762                      set->content->elements,
763                      &ee->element_hash,
764                      ee,
765                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
766     }
767   else if (GNUNET_YES ==
768            is_element_of_generation(ee,
769                                     set->current_generation,
770                                     set->excluded_generations,
771                                     set->excluded_generations_size))
772     {
773       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
774                  "Client inserted element %s of size %u twice (ignored)\n",
775                  GNUNET_h2s(&hash),
776                  el.size);
777
778       /* same element inserted twice */
779       return;
780     }
781
782   {
783     struct MutationEvent mut = { .generation = set->current_generation,
784                                  .added = GNUNET_YES };
785     GNUNET_array_append(ee->mutations, ee->mutations_size, mut);
786   }
787   set->vt->add(set->state, ee);
788 }
789
790
791 /**
792  * Remove an element from @a set as specified by @a msg
793  *
794  * @param set set to manipulate
795  * @param msg message specifying the change
796  */
797 static void
798 execute_remove(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
799 {
800   struct GNUNET_SET_Element el;
801   struct ElementEntry *ee;
802   struct GNUNET_HashCode hash;
803
804   GNUNET_assert(GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs(msg->header.type));
805   el.size = ntohs(msg->header.size) - sizeof(*msg);
806   el.data = &msg[1];
807   el.element_type = ntohs(msg->element_type);
808   GNUNET_SET_element_hash(&el, &hash);
809   ee = GNUNET_CONTAINER_multihashmap_get(set->content->elements, &hash);
810   if (NULL == ee)
811     {
812       /* Client tried to remove non-existing element. */
813       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
814                  "Client removes non-existing element of size %u\n",
815                  el.size);
816       return;
817     }
818   if (GNUNET_NO == is_element_of_generation(ee,
819                                             set->current_generation,
820                                             set->excluded_generations,
821                                             set->excluded_generations_size))
822     {
823       /* Client tried to remove element twice */
824       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
825                  "Client removed element of size %u twice (ignored)\n",
826                  el.size);
827       return;
828     }
829   else
830     {
831       struct MutationEvent mut = { .generation = set->current_generation,
832                                    .added = GNUNET_NO };
833
834       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
835                  "Client removes element of size %u\n",
836                  el.size);
837
838       GNUNET_array_append(ee->mutations, ee->mutations_size, mut);
839     }
840   set->vt->remove(set->state, ee);
841 }
842
843
844 /**
845  * Perform a mutation on a set as specified by the @a msg
846  *
847  * @param set the set to mutate
848  * @param msg specification of what to change
849  */
850 static void
851 execute_mutation(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
852 {
853   switch (ntohs(msg->header.type))
854     {
855     case GNUNET_MESSAGE_TYPE_SET_ADD:
856       execute_add(set, msg);
857       break;
858
859     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
860       execute_remove(set, msg);
861       break;
862
863     default:
864       GNUNET_break(0);
865     }
866 }
867
868
869 /**
870  * Execute mutations that were delayed on a set because of
871  * pending operations.
872  *
873  * @param set the set to execute mutations on
874  */
875 static void
876 execute_delayed_mutations(struct Set *set)
877 {
878   struct PendingMutation *pm;
879
880   if (0 != set->content->iterator_count)
881     return; /* still cannot do this */
882   while (NULL != (pm = set->content->pending_mutations_head))
883     {
884       GNUNET_CONTAINER_DLL_remove(set->content->pending_mutations_head,
885                                   set->content->pending_mutations_tail,
886                                   pm);
887       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
888                  "Executing pending mutation on %p.\n",
889                  pm->set);
890       execute_mutation(pm->set, pm->msg);
891       GNUNET_free(pm->msg);
892       GNUNET_free(pm);
893     }
894 }
895
896
897 /**
898  * Send the next element of a set to the set's client.  The next element is given by
899  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
900  * are no more elements in the set.  The caller must ensure that the set's iterator is
901  * valid.
902  *
903  * The client will acknowledge each received element with a
904  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
905  * #handle_client_iter_ack() will then trigger the next transmission.
906  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
907  *
908  * @param set set that should send its next element to its client
909  */
910 static void
911 send_client_element(struct Set *set)
912 {
913   int ret;
914   struct ElementEntry *ee;
915   struct GNUNET_MQ_Envelope *ev;
916   struct GNUNET_SET_IterResponseMessage *msg;
917
918   GNUNET_assert(NULL != set->iter);
919   do
920     {
921       ret = GNUNET_CONTAINER_multihashmap_iterator_next(set->iter,
922                                                         NULL,
923                                                         (const void **)&ee);
924       if (GNUNET_NO == ret)
925         {
926           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
927           ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
928           GNUNET_CONTAINER_multihashmap_iterator_destroy(set->iter);
929           set->iter = NULL;
930           set->iteration_id++;
931           GNUNET_assert(set->content->iterator_count > 0);
932           set->content->iterator_count--;
933           execute_delayed_mutations(set);
934           GNUNET_MQ_send(set->cs->mq, ev);
935           return;
936         }
937       GNUNET_assert(NULL != ee);
938     }
939   while (GNUNET_NO ==
940          is_element_of_generation(ee,
941                                   set->iter_generation,
942                                   set->excluded_generations,
943                                   set->excluded_generations_size));
944   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
945              "Sending iteration element on %p.\n",
946              set);
947   ev = GNUNET_MQ_msg_extra(msg,
948                            ee->element.size,
949                            GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
950   GNUNET_memcpy(&msg[1], ee->element.data, ee->element.size);
951   msg->element_type = htons(ee->element.element_type);
952   msg->iteration_id = htons(set->iteration_id);
953   GNUNET_MQ_send(set->cs->mq, ev);
954 }
955
956
957 /**
958  * Called when a client wants to iterate the elements of a set.
959  * Checks if we have a set associated with the client and if we
960  * can right now start an iteration. If all checks out, starts
961  * sending the elements of the set to the client.
962  *
963  * @param cls client that sent the message
964  * @param m message sent by the client
965  */
966 static void
967 handle_client_iterate(void *cls, const struct GNUNET_MessageHeader *m)
968 {
969   struct ClientState *cs = cls;
970   struct Set *set;
971
972   if (NULL == (set = cs->set))
973     {
974       /* attempt to iterate over a non existing set */
975       GNUNET_break(0);
976       GNUNET_SERVICE_client_drop(cs->client);
977       return;
978     }
979   if (NULL != set->iter)
980     {
981       /* Only one concurrent iterate-action allowed per set */
982       GNUNET_break(0);
983       GNUNET_SERVICE_client_drop(cs->client);
984       return;
985     }
986   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
987              "Iterating set %p in gen %u with %u content elements\n",
988              (void *)set,
989              set->current_generation,
990              GNUNET_CONTAINER_multihashmap_size(set->content->elements));
991   GNUNET_SERVICE_client_continue(cs->client);
992   set->content->iterator_count++;
993   set->iter =
994     GNUNET_CONTAINER_multihashmap_iterator_create(set->content->elements);
995   set->iter_generation = set->current_generation;
996   send_client_element(set);
997 }
998
999
1000 /**
1001  * Called when a client wants to create a new set.  This is typically
1002  * the first request from a client, and includes the type of set
1003  * operation to be performed.
1004  *
1005  * @param cls client that sent the message
1006  * @param m message sent by the client
1007  */
1008 static void
1009 handle_client_create_set(void *cls, const struct GNUNET_SET_CreateMessage *msg)
1010 {
1011   struct ClientState *cs = cls;
1012   struct Set *set;
1013
1014   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1015              "Client created new set (operation %u)\n",
1016              (uint32_t)ntohl(msg->operation));
1017   if (NULL != cs->set)
1018     {
1019       /* There can only be one set per client */
1020       GNUNET_break(0);
1021       GNUNET_SERVICE_client_drop(cs->client);
1022       return;
1023     }
1024   set = GNUNET_new(struct Set);
1025   switch (ntohl(msg->operation))
1026     {
1027     case GNUNET_SET_OPERATION_INTERSECTION:
1028       set->vt = _GSS_intersection_vt();
1029       break;
1030
1031     case GNUNET_SET_OPERATION_UNION:
1032       set->vt = _GSS_union_vt();
1033       break;
1034
1035     default:
1036       GNUNET_free(set);
1037       GNUNET_break(0);
1038       GNUNET_SERVICE_client_drop(cs->client);
1039       return;
1040     }
1041   set->operation = (enum GNUNET_SET_OperationType)ntohl(msg->operation);
1042   set->state = set->vt->create();
1043   if (NULL == set->state)
1044     {
1045       /* initialization failed (i.e. out of memory) */
1046       GNUNET_free(set);
1047       GNUNET_SERVICE_client_drop(cs->client);
1048       return;
1049     }
1050   set->content = GNUNET_new(struct SetContent);
1051   set->content->refcount = 1;
1052   set->content->elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
1053   set->cs = cs;
1054   cs->set = set;
1055   GNUNET_SERVICE_client_continue(cs->client);
1056 }
1057
1058
1059 /**
1060  * Timeout happens iff:
1061  *  - we suggested an operation to our listener,
1062  *    but did not receive a response in time
1063  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1064  *
1065  * @param cls channel context
1066  * @param tc context information (why was this task triggered now)
1067  */
1068 static void
1069 incoming_timeout_cb(void *cls)
1070 {
1071   struct Operation *op = cls;
1072
1073   op->timeout_task = NULL;
1074   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1075              "Remote peer's incoming request timed out\n");
1076   incoming_destroy(op);
1077 }
1078
1079
1080 /**
1081  * Method called whenever another peer has added us to a channel the
1082  * other peer initiated.  Only called (once) upon reception of data
1083  * from a channel we listen on.
1084  *
1085  * The channel context represents the operation itself and gets added
1086  * to a DLL, from where it gets looked up when our local listener
1087  * client responds to a proposed/suggested operation or connects and
1088  * associates with this operation.
1089  *
1090  * @param cls closure
1091  * @param channel new handle to the channel
1092  * @param source peer that started the channel
1093  * @return initial channel context for the channel
1094  *         returns NULL on error
1095  */
1096 static void *
1097 channel_new_cb(void *cls,
1098                struct GNUNET_CADET_Channel *channel,
1099                const struct GNUNET_PeerIdentity *source)
1100 {
1101   struct Listener *listener = cls;
1102   struct Operation *op;
1103
1104   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1105   op = GNUNET_new(struct Operation);
1106   op->listener = listener;
1107   op->peer = *source;
1108   op->channel = channel;
1109   op->mq = GNUNET_CADET_get_mq(op->channel);
1110   op->salt = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1111   op->timeout_task = GNUNET_SCHEDULER_add_delayed(INCOMING_CHANNEL_TIMEOUT,
1112                                                   &incoming_timeout_cb,
1113                                                   op);
1114   GNUNET_CONTAINER_DLL_insert(listener->op_head, listener->op_tail, op);
1115   return op;
1116 }
1117
1118
1119 /**
1120  * Function called whenever a channel is destroyed.  Should clean up
1121  * any associated state.  It must NOT call
1122  * GNUNET_CADET_channel_destroy() on the channel.
1123  *
1124  * The peer_disconnect function is part of a a virtual table set initially either
1125  * when a peer creates a new channel with us, or once we create
1126  * a new channel ourselves (evaluate).
1127  *
1128  * Once we know the exact type of operation (union/intersection), the vt is
1129  * replaced with an operation specific instance (_GSS_[op]_vt).
1130  *
1131  * @param channel_ctx place where local state associated
1132  *                   with the channel is stored
1133  * @param channel connection to the other end (henceforth invalid)
1134  */
1135 static void
1136 channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1137 {
1138   struct Operation *op = channel_ctx;
1139
1140   op->channel = NULL;
1141   _GSS_operation_destroy2(op);
1142 }
1143
1144
1145 /**
1146  * This function probably should not exist
1147  * and be replaced by inlining more specific
1148  * logic in the various places where it is called.
1149  */
1150 void
1151 _GSS_operation_destroy2(struct Operation *op)
1152 {
1153   struct GNUNET_CADET_Channel *channel;
1154
1155   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1156   if (NULL != (channel = op->channel))
1157     {
1158       /* This will free op; called conditionally as this helper function
1159          is also called from within the channel disconnect handler. */
1160       op->channel = NULL;
1161       GNUNET_CADET_channel_destroy(channel);
1162     }
1163   if (NULL != op->listener)
1164     {
1165       incoming_destroy(op);
1166       return;
1167     }
1168   if (NULL != op->set)
1169     op->set->vt->channel_death(op);
1170   else
1171     _GSS_operation_destroy(op, GNUNET_YES);
1172   GNUNET_free(op);
1173 }
1174
1175
1176 /**
1177  * Function called whenever an MQ-channel's transmission window size changes.
1178  *
1179  * The first callback in an outgoing channel will be with a non-zero value
1180  * and will mean the channel is connected to the destination.
1181  *
1182  * For an incoming channel it will be called immediately after the
1183  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1184  *
1185  * @param cls Channel closure.
1186  * @param channel Connection to the other end (henceforth invalid).
1187  * @param window_size New window size. If the is more messages than buffer size
1188  *                    this value will be negative..
1189  */
1190 static void
1191 channel_window_cb(void *cls,
1192                   const struct GNUNET_CADET_Channel *channel,
1193                   int window_size)
1194 {
1195   /* FIXME: not implemented, we could do flow control here... */
1196 }
1197
1198
1199 /**
1200  * Called when a client wants to create a new listener.
1201  *
1202  * @param cls client that sent the message
1203  * @param msg message sent by the client
1204  */
1205 static void
1206 handle_client_listen(void *cls, const struct GNUNET_SET_ListenMessage *msg)
1207 {
1208   struct ClientState *cs = cls;
1209   struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1210   { GNUNET_MQ_hd_var_size(incoming_msg,
1211                           GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1212                           struct OperationRequestMessage,
1213                           NULL),
1214     GNUNET_MQ_hd_var_size(union_p2p_ibf,
1215                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1216                           struct IBFMessage,
1217                           NULL),
1218     GNUNET_MQ_hd_var_size(union_p2p_elements,
1219                           GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1220                           struct GNUNET_SET_ElementMessage,
1221                           NULL),
1222     GNUNET_MQ_hd_var_size(union_p2p_offer,
1223                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1224                           struct GNUNET_MessageHeader,
1225                           NULL),
1226     GNUNET_MQ_hd_var_size(union_p2p_inquiry,
1227                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1228                           struct InquiryMessage,
1229                           NULL),
1230     GNUNET_MQ_hd_var_size(union_p2p_demand,
1231                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1232                           struct GNUNET_MessageHeader,
1233                           NULL),
1234     GNUNET_MQ_hd_fixed_size(union_p2p_done,
1235                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1236                             struct GNUNET_MessageHeader,
1237                             NULL),
1238     GNUNET_MQ_hd_fixed_size(union_p2p_over,
1239                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1240                             struct GNUNET_MessageHeader,
1241                             NULL),
1242     GNUNET_MQ_hd_fixed_size(union_p2p_full_done,
1243                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1244                             struct GNUNET_MessageHeader,
1245                             NULL),
1246     GNUNET_MQ_hd_fixed_size(union_p2p_request_full,
1247                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1248                             struct GNUNET_MessageHeader,
1249                             NULL),
1250     GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1251                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1252                           struct StrataEstimatorMessage,
1253                           NULL),
1254     GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1255                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1256                           struct StrataEstimatorMessage,
1257                           NULL),
1258     GNUNET_MQ_hd_var_size(union_p2p_full_element,
1259                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1260                           struct GNUNET_SET_ElementMessage,
1261                           NULL),
1262     GNUNET_MQ_hd_fixed_size(intersection_p2p_element_info,
1263                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1264                             struct IntersectionElementInfoMessage,
1265                             NULL),
1266     GNUNET_MQ_hd_var_size(intersection_p2p_bf,
1267                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1268                           struct BFMessage,
1269                           NULL),
1270     GNUNET_MQ_hd_fixed_size(intersection_p2p_done,
1271                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1272                             struct IntersectionDoneMessage,
1273                             NULL),
1274     GNUNET_MQ_handler_end() };
1275   struct Listener *listener;
1276
1277   if (NULL != cs->listener)
1278     {
1279       /* max. one active listener per client! */
1280       GNUNET_break(0);
1281       GNUNET_SERVICE_client_drop(cs->client);
1282       return;
1283     }
1284   listener = GNUNET_new(struct Listener);
1285   listener->cs = cs;
1286   cs->listener = listener;
1287   listener->app_id = msg->app_id;
1288   listener->operation = (enum GNUNET_SET_OperationType)ntohl(msg->operation);
1289   GNUNET_CONTAINER_DLL_insert(listener_head, listener_tail, listener);
1290   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1291              "New listener created (op %u, port %s)\n",
1292              listener->operation,
1293              GNUNET_h2s(&listener->app_id));
1294   listener->open_port = GNUNET_CADET_open_port(cadet,
1295                                                &msg->app_id,
1296                                                &channel_new_cb,
1297                                                listener,
1298                                                &channel_window_cb,
1299                                                &channel_end_cb,
1300                                                cadet_handlers);
1301   GNUNET_SERVICE_client_continue(cs->client);
1302 }
1303
1304
1305 /**
1306  * Called when the listening client rejects an operation
1307  * request by another peer.
1308  *
1309  * @param cls client that sent the message
1310  * @param msg message sent by the client
1311  */
1312 static void
1313 handle_client_reject(void *cls, const struct GNUNET_SET_RejectMessage *msg)
1314 {
1315   struct ClientState *cs = cls;
1316   struct Operation *op;
1317
1318   op = get_incoming(ntohl(msg->accept_reject_id));
1319   if (NULL == op)
1320     {
1321       /* no matching incoming operation for this reject;
1322          could be that the other peer already disconnected... */
1323       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1324                  "Client rejected unknown operation %u\n",
1325                  (unsigned int)ntohl(msg->accept_reject_id));
1326       GNUNET_SERVICE_client_continue(cs->client);
1327       return;
1328     }
1329   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1330              "Peer request (op %u, app %s) rejected by client\n",
1331              op->listener->operation,
1332              GNUNET_h2s(&cs->listener->app_id));
1333   _GSS_operation_destroy2(op);
1334   GNUNET_SERVICE_client_continue(cs->client);
1335 }
1336
1337
1338 /**
1339  * Called when a client wants to add or remove an element to a set it inhabits.
1340  *
1341  * @param cls client that sent the message
1342  * @param msg message sent by the client
1343  */
1344 static int
1345 check_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
1346 {
1347   /* NOTE: Technically, we should probably check with the
1348      block library whether the element we are given is well-formed */
1349   return GNUNET_OK;
1350 }
1351
1352
1353 /**
1354  * Called when a client wants to add or remove an element to a set it inhabits.
1355  *
1356  * @param cls client that sent the message
1357  * @param msg message sent by the client
1358  */
1359 static void
1360 handle_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
1361 {
1362   struct ClientState *cs = cls;
1363   struct Set *set;
1364
1365   if (NULL == (set = cs->set))
1366     {
1367       /* client without a set requested an operation */
1368       GNUNET_break(0);
1369       GNUNET_SERVICE_client_drop(cs->client);
1370       return;
1371     }
1372   GNUNET_SERVICE_client_continue(cs->client);
1373
1374   if (0 != set->content->iterator_count)
1375     {
1376       struct PendingMutation *pm;
1377
1378       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1379       pm = GNUNET_new(struct PendingMutation);
1380       pm->msg =
1381         (struct GNUNET_SET_ElementMessage *)GNUNET_copy_message(&msg->header);
1382       pm->set = set;
1383       GNUNET_CONTAINER_DLL_insert_tail(set->content->pending_mutations_head,
1384                                        set->content->pending_mutations_tail,
1385                                        pm);
1386       return;
1387     }
1388   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1389   execute_mutation(set, msg);
1390 }
1391
1392
1393 /**
1394  * Advance the current generation of a set,
1395  * adding exclusion ranges if necessary.
1396  *
1397  * @param set the set where we want to advance the generation
1398  */
1399 static void
1400 advance_generation(struct Set *set)
1401 {
1402   struct GenerationRange r;
1403
1404   if (set->current_generation == set->content->latest_generation)
1405     {
1406       set->content->latest_generation++;
1407       set->current_generation++;
1408       return;
1409     }
1410
1411   GNUNET_assert(set->current_generation < set->content->latest_generation);
1412
1413   r.start = set->current_generation + 1;
1414   r.end = set->content->latest_generation + 1;
1415   set->content->latest_generation = r.end;
1416   set->current_generation = r.end;
1417   GNUNET_array_append(set->excluded_generations,
1418                       set->excluded_generations_size,
1419                       r);
1420 }
1421
1422
1423 /**
1424  * Called when a client wants to initiate a set operation with another
1425  * peer.  Initiates the CADET connection to the listener and sends the
1426  * request.
1427  *
1428  * @param cls client that sent the message
1429  * @param msg message sent by the client
1430  * @return #GNUNET_OK if the message is well-formed
1431  */
1432 static int
1433 check_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1434 {
1435   /* FIXME: suboptimal, even if the context below could be NULL,
1436      there are malformed messages this does not check for... */
1437   return GNUNET_OK;
1438 }
1439
1440
1441 /**
1442  * Called when a client wants to initiate a set operation with another
1443  * peer.  Initiates the CADET connection to the listener and sends the
1444  * request.
1445  *
1446  * @param cls client that sent the message
1447  * @param msg message sent by the client
1448  */
1449 static void
1450 handle_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1451 {
1452   struct ClientState *cs = cls;
1453   struct Operation *op = GNUNET_new(struct Operation);
1454   const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1455   { GNUNET_MQ_hd_var_size(incoming_msg,
1456                           GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1457                           struct OperationRequestMessage,
1458                           op),
1459     GNUNET_MQ_hd_var_size(union_p2p_ibf,
1460                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1461                           struct IBFMessage,
1462                           op),
1463     GNUNET_MQ_hd_var_size(union_p2p_elements,
1464                           GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1465                           struct GNUNET_SET_ElementMessage,
1466                           op),
1467     GNUNET_MQ_hd_var_size(union_p2p_offer,
1468                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1469                           struct GNUNET_MessageHeader,
1470                           op),
1471     GNUNET_MQ_hd_var_size(union_p2p_inquiry,
1472                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1473                           struct InquiryMessage,
1474                           op),
1475     GNUNET_MQ_hd_var_size(union_p2p_demand,
1476                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1477                           struct GNUNET_MessageHeader,
1478                           op),
1479     GNUNET_MQ_hd_fixed_size(union_p2p_done,
1480                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1481                             struct GNUNET_MessageHeader,
1482                             op),
1483     GNUNET_MQ_hd_fixed_size(union_p2p_over,
1484                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1485                             struct GNUNET_MessageHeader,
1486                             op),
1487     GNUNET_MQ_hd_fixed_size(union_p2p_full_done,
1488                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1489                             struct GNUNET_MessageHeader,
1490                             op),
1491     GNUNET_MQ_hd_fixed_size(union_p2p_request_full,
1492                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1493                             struct GNUNET_MessageHeader,
1494                             op),
1495     GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1496                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1497                           struct StrataEstimatorMessage,
1498                           op),
1499     GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1500                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1501                           struct StrataEstimatorMessage,
1502                           op),
1503     GNUNET_MQ_hd_var_size(union_p2p_full_element,
1504                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1505                           struct GNUNET_SET_ElementMessage,
1506                           op),
1507     GNUNET_MQ_hd_fixed_size(intersection_p2p_element_info,
1508                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1509                             struct IntersectionElementInfoMessage,
1510                             op),
1511     GNUNET_MQ_hd_var_size(intersection_p2p_bf,
1512                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1513                           struct BFMessage,
1514                           op),
1515     GNUNET_MQ_hd_fixed_size(intersection_p2p_done,
1516                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1517                             struct IntersectionDoneMessage,
1518                             op),
1519     GNUNET_MQ_handler_end() };
1520   struct Set *set;
1521   const struct GNUNET_MessageHeader *context;
1522
1523   if (NULL == (set = cs->set))
1524     {
1525       GNUNET_break(0);
1526       GNUNET_free(op);
1527       GNUNET_SERVICE_client_drop(cs->client);
1528       return;
1529     }
1530   op->salt = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1531   op->peer = msg->target_peer;
1532   op->result_mode = ntohl(msg->result_mode);
1533   op->client_request_id = ntohl(msg->request_id);
1534   op->byzantine = msg->byzantine;
1535   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1536   op->force_full = msg->force_full;
1537   op->force_delta = msg->force_delta;
1538   context = GNUNET_MQ_extract_nested_mh(msg);
1539
1540   /* Advance generation values, so that
1541      mutations won't interfer with the running operation. */
1542   op->set = set;
1543   op->generation_created = set->current_generation;
1544   advance_generation(set);
1545   GNUNET_CONTAINER_DLL_insert(set->ops_head, set->ops_tail, op);
1546   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1547              "Creating new CADET channel to port %s for set operation type %u\n",
1548              GNUNET_h2s(&msg->app_id),
1549              set->operation);
1550   op->channel = GNUNET_CADET_channel_create(cadet,
1551                                             op,
1552                                             &msg->target_peer,
1553                                             &msg->app_id,
1554                                             &channel_window_cb,
1555                                             &channel_end_cb,
1556                                             cadet_handlers);
1557   op->mq = GNUNET_CADET_get_mq(op->channel);
1558   op->state = set->vt->evaluate(op, context);
1559   if (NULL == op->state)
1560     {
1561       GNUNET_break(0);
1562       GNUNET_SERVICE_client_drop(cs->client);
1563       return;
1564     }
1565   GNUNET_SERVICE_client_continue(cs->client);
1566 }
1567
1568
1569 /**
1570  * Handle an ack from a client, and send the next element. Note
1571  * that we only expect acks for set elements, not after the
1572  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1573  *
1574  * @param cls client the client
1575  * @param ack the message
1576  */
1577 static void
1578 handle_client_iter_ack(void *cls, const struct GNUNET_SET_IterAckMessage *ack)
1579 {
1580   struct ClientState *cs = cls;
1581   struct Set *set;
1582
1583   if (NULL == (set = cs->set))
1584     {
1585       /* client without a set acknowledged receiving a value */
1586       GNUNET_break(0);
1587       GNUNET_SERVICE_client_drop(cs->client);
1588       return;
1589     }
1590   if (NULL == set->iter)
1591     {
1592       /* client sent an ack, but we were not expecting one (as
1593          set iteration has finished) */
1594       GNUNET_break(0);
1595       GNUNET_SERVICE_client_drop(cs->client);
1596       return;
1597     }
1598   GNUNET_SERVICE_client_continue(cs->client);
1599   if (ntohl(ack->send_more))
1600     {
1601       send_client_element(set);
1602     }
1603   else
1604     {
1605       GNUNET_CONTAINER_multihashmap_iterator_destroy(set->iter);
1606       set->iter = NULL;
1607       set->iteration_id++;
1608     }
1609 }
1610
1611
1612 /**
1613  * Handle a request from the client to copy a set.
1614  *
1615  * @param cls the client
1616  * @param mh the message
1617  */
1618 static void
1619 handle_client_copy_lazy_prepare(void *cls,
1620                                 const struct GNUNET_MessageHeader *mh)
1621 {
1622   struct ClientState *cs = cls;
1623   struct Set *set;
1624   struct LazyCopyRequest *cr;
1625   struct GNUNET_MQ_Envelope *ev;
1626   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1627
1628   if (NULL == (set = cs->set))
1629     {
1630       /* client without a set requested an operation */
1631       GNUNET_break(0);
1632       GNUNET_SERVICE_client_drop(cs->client);
1633       return;
1634     }
1635   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1636              "Client requested creation of lazy copy\n");
1637   cr = GNUNET_new(struct LazyCopyRequest);
1638   cr->cookie = ++lazy_copy_cookie;
1639   cr->source_set = set;
1640   GNUNET_CONTAINER_DLL_insert(lazy_copy_head, lazy_copy_tail, cr);
1641   ev = GNUNET_MQ_msg(resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1642   resp_msg->cookie = cr->cookie;
1643   GNUNET_MQ_send(set->cs->mq, ev);
1644   GNUNET_SERVICE_client_continue(cs->client);
1645 }
1646
1647
1648 /**
1649  * Handle a request from the client to connect to a copy of a set.
1650  *
1651  * @param cls the client
1652  * @param msg the message
1653  */
1654 static void
1655 handle_client_copy_lazy_connect(
1656   void *cls,
1657   const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1658 {
1659   struct ClientState *cs = cls;
1660   struct LazyCopyRequest *cr;
1661   struct Set *set;
1662   int found;
1663
1664   if (NULL != cs->set)
1665     {
1666       /* There can only be one set per client */
1667       GNUNET_break(0);
1668       GNUNET_SERVICE_client_drop(cs->client);
1669       return;
1670     }
1671   found = GNUNET_NO;
1672   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1673     {
1674       if (cr->cookie == msg->cookie)
1675         {
1676           found = GNUNET_YES;
1677           break;
1678         }
1679     }
1680   if (GNUNET_NO == found)
1681     {
1682       /* client asked for copy with cookie we don't know */
1683       GNUNET_break(0);
1684       GNUNET_SERVICE_client_drop(cs->client);
1685       return;
1686     }
1687   GNUNET_CONTAINER_DLL_remove(lazy_copy_head, lazy_copy_tail, cr);
1688   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1689              "Client %p requested use of lazy copy\n",
1690              cs);
1691   set = GNUNET_new(struct Set);
1692   switch (cr->source_set->operation)
1693     {
1694     case GNUNET_SET_OPERATION_INTERSECTION:
1695       set->vt = _GSS_intersection_vt();
1696       break;
1697
1698     case GNUNET_SET_OPERATION_UNION:
1699       set->vt = _GSS_union_vt();
1700       break;
1701
1702     default:
1703       GNUNET_assert(0);
1704       return;
1705     }
1706
1707   if (NULL == set->vt->copy_state)
1708     {
1709       /* Lazy copy not supported for this set operation */
1710       GNUNET_break(0);
1711       GNUNET_free(set);
1712       GNUNET_free(cr);
1713       GNUNET_SERVICE_client_drop(cs->client);
1714       return;
1715     }
1716
1717   set->operation = cr->source_set->operation;
1718   set->state = set->vt->copy_state(cr->source_set->state);
1719   set->content = cr->source_set->content;
1720   set->content->refcount++;
1721
1722   set->current_generation = cr->source_set->current_generation;
1723   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1724   set->excluded_generations =
1725     GNUNET_memdup(cr->source_set->excluded_generations,
1726                   set->excluded_generations_size *
1727                   sizeof(struct GenerationRange));
1728
1729   /* Advance the generation of the new set, so that mutations to the
1730      of the cloned set and the source set are independent. */
1731   advance_generation(set);
1732   set->cs = cs;
1733   cs->set = set;
1734   GNUNET_free(cr);
1735   GNUNET_SERVICE_client_continue(cs->client);
1736 }
1737
1738
1739 /**
1740  * Handle a request from the client to cancel a running set operation.
1741  *
1742  * @param cls the client
1743  * @param msg the message
1744  */
1745 static void
1746 handle_client_cancel(void *cls, const struct GNUNET_SET_CancelMessage *msg)
1747 {
1748   struct ClientState *cs = cls;
1749   struct Set *set;
1750   struct Operation *op;
1751   int found;
1752
1753   if (NULL == (set = cs->set))
1754     {
1755       /* client without a set requested an operation */
1756       GNUNET_break(0);
1757       GNUNET_SERVICE_client_drop(cs->client);
1758       return;
1759     }
1760   found = GNUNET_NO;
1761   for (op = set->ops_head; NULL != op; op = op->next)
1762     {
1763       if (op->client_request_id == ntohl(msg->request_id))
1764         {
1765           found = GNUNET_YES;
1766           break;
1767         }
1768     }
1769   if (GNUNET_NO == found)
1770     {
1771       /* It may happen that the operation was already destroyed due to
1772        * the other peer disconnecting.  The client may not know about this
1773        * yet and try to cancel the (just barely non-existent) operation.
1774        * So this is not a hard error.
1775        */
1776       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1777                  "Client canceled non-existent op %u\n",
1778                  (uint32_t)ntohl(msg->request_id));
1779     }
1780   else
1781     {
1782       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1783                  "Client requested cancel for op %u\n",
1784                  (uint32_t)ntohl(msg->request_id));
1785       _GSS_operation_destroy(op, GNUNET_YES);
1786     }
1787   GNUNET_SERVICE_client_continue(cs->client);
1788 }
1789
1790
1791 /**
1792  * Handle a request from the client to accept a set operation that
1793  * came from a remote peer.  We forward the accept to the associated
1794  * operation for handling
1795  *
1796  * @param cls the client
1797  * @param msg the message
1798  */
1799 static void
1800 handle_client_accept(void *cls, const struct GNUNET_SET_AcceptMessage *msg)
1801 {
1802   struct ClientState *cs = cls;
1803   struct Set *set;
1804   struct Operation *op;
1805   struct GNUNET_SET_ResultMessage *result_message;
1806   struct GNUNET_MQ_Envelope *ev;
1807   struct Listener *listener;
1808
1809   if (NULL == (set = cs->set))
1810     {
1811       /* client without a set requested to accept */
1812       GNUNET_break(0);
1813       GNUNET_SERVICE_client_drop(cs->client);
1814       return;
1815     }
1816   op = get_incoming(ntohl(msg->accept_reject_id));
1817   if (NULL == op)
1818     {
1819       /* It is not an error if the set op does not exist -- it may
1820       * have been destroyed when the partner peer disconnected. */
1821       GNUNET_log(
1822         GNUNET_ERROR_TYPE_INFO,
1823         "Client %p accepted request %u of listener %p that is no longer active\n",
1824         cs,
1825         ntohl(msg->accept_reject_id),
1826         cs->listener);
1827       ev = GNUNET_MQ_msg(result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1828       result_message->request_id = msg->request_id;
1829       result_message->result_status = htons(GNUNET_SET_STATUS_FAILURE);
1830       GNUNET_MQ_send(set->cs->mq, ev);
1831       GNUNET_SERVICE_client_continue(cs->client);
1832       return;
1833     }
1834   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1835              "Client accepting request %u\n",
1836              (uint32_t)ntohl(msg->accept_reject_id));
1837   listener = op->listener;
1838   op->listener = NULL;
1839   GNUNET_CONTAINER_DLL_remove(listener->op_head, listener->op_tail, op);
1840   op->set = set;
1841   GNUNET_CONTAINER_DLL_insert(set->ops_head, set->ops_tail, op);
1842   op->client_request_id = ntohl(msg->request_id);
1843   op->result_mode = ntohl(msg->result_mode);
1844   op->byzantine = msg->byzantine;
1845   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1846   op->force_full = msg->force_full;
1847   op->force_delta = msg->force_delta;
1848
1849   /* Advance generation values, so that future mutations do not
1850      interfer with the running operation. */
1851   op->generation_created = set->current_generation;
1852   advance_generation(set);
1853   GNUNET_assert(NULL == op->state);
1854   op->state = set->vt->accept(op);
1855   if (NULL == op->state)
1856     {
1857       GNUNET_break(0);
1858       GNUNET_SERVICE_client_drop(cs->client);
1859       return;
1860     }
1861   /* Now allow CADET to continue, as we did not do this in
1862    #handle_incoming_msg (as we wanted to first see if the
1863      local client would accept the request). */
1864   GNUNET_CADET_receive_done(op->channel);
1865   GNUNET_SERVICE_client_continue(cs->client);
1866 }
1867
1868
1869 /**
1870  * Called to clean up, after a shutdown has been requested.
1871  *
1872  * @param cls closure, NULL
1873  */
1874 static void
1875 shutdown_task(void *cls)
1876 {
1877   /* Delay actual shutdown to allow service to disconnect clients */
1878   in_shutdown = GNUNET_YES;
1879   if (0 == num_clients)
1880     {
1881       if (NULL != cadet)
1882         {
1883           GNUNET_CADET_disconnect(cadet);
1884           cadet = NULL;
1885         }
1886     }
1887   GNUNET_STATISTICS_destroy(_GSS_statistics, GNUNET_YES);
1888   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1889 }
1890
1891
1892 /**
1893  * Function called by the service's run
1894  * method to run service-specific setup code.
1895  *
1896  * @param cls closure
1897  * @param cfg configuration to use
1898  * @param service the initialized service
1899  */
1900 static void
1901 run(void *cls,
1902     const struct GNUNET_CONFIGURATION_Handle *cfg,
1903     struct GNUNET_SERVICE_Handle *service)
1904 {
1905   /* FIXME: need to modify SERVICE (!) API to allow
1906      us to run a shutdown task *after* clients were
1907      forcefully disconnected! */
1908   GNUNET_SCHEDULER_add_shutdown(&shutdown_task, NULL);
1909   _GSS_statistics = GNUNET_STATISTICS_create("set", cfg);
1910   cadet = GNUNET_CADET_connect(cfg);
1911   if (NULL == cadet)
1912     {
1913       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1914                  _("Could not connect to CADET service\n"));
1915       GNUNET_SCHEDULER_shutdown();
1916       return;
1917     }
1918 }
1919
1920
1921 /**
1922  * Define "main" method using service macro.
1923  */
1924 GNUNET_SERVICE_MAIN(
1925   "set",
1926   GNUNET_SERVICE_OPTION_NONE,
1927   &run,
1928   &client_connect_cb,
1929   &client_disconnect_cb,
1930   NULL,
1931   GNUNET_MQ_hd_fixed_size(client_accept,
1932                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1933                           struct GNUNET_SET_AcceptMessage,
1934                           NULL),
1935   GNUNET_MQ_hd_fixed_size(client_iter_ack,
1936                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1937                           struct GNUNET_SET_IterAckMessage,
1938                           NULL),
1939   GNUNET_MQ_hd_var_size(client_mutation,
1940                         GNUNET_MESSAGE_TYPE_SET_ADD,
1941                         struct GNUNET_SET_ElementMessage,
1942                         NULL),
1943   GNUNET_MQ_hd_fixed_size(client_create_set,
1944                           GNUNET_MESSAGE_TYPE_SET_CREATE,
1945                           struct GNUNET_SET_CreateMessage,
1946                           NULL),
1947   GNUNET_MQ_hd_fixed_size(client_iterate,
1948                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1949                           struct GNUNET_MessageHeader,
1950                           NULL),
1951   GNUNET_MQ_hd_var_size(client_evaluate,
1952                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1953                         struct GNUNET_SET_EvaluateMessage,
1954                         NULL),
1955   GNUNET_MQ_hd_fixed_size(client_listen,
1956                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
1957                           struct GNUNET_SET_ListenMessage,
1958                           NULL),
1959   GNUNET_MQ_hd_fixed_size(client_reject,
1960                           GNUNET_MESSAGE_TYPE_SET_REJECT,
1961                           struct GNUNET_SET_RejectMessage,
1962                           NULL),
1963   GNUNET_MQ_hd_var_size(client_mutation,
1964                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
1965                         struct GNUNET_SET_ElementMessage,
1966                         NULL),
1967   GNUNET_MQ_hd_fixed_size(client_cancel,
1968                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
1969                           struct GNUNET_SET_CancelMessage,
1970                           NULL),
1971   GNUNET_MQ_hd_fixed_size(client_copy_lazy_prepare,
1972                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1973                           struct GNUNET_MessageHeader,
1974                           NULL),
1975   GNUNET_MQ_hd_fixed_size(client_copy_lazy_connect,
1976                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1977                           struct GNUNET_SET_CopyLazyConnectMessage,
1978                           NULL),
1979   GNUNET_MQ_handler_end());
1980
1981
1982 /* end of gnunet-service-set.c */