- fix CADET-using services
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013, 2014 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
29
30 /**
31  * How long do we hold on to an incoming channel if there is
32  * no local listener before giving up?
33  */
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35
36 /**
37  * A listener is inhabited by a client, and waits for evaluation
38  * requests from remote peers.
39  */
40 struct Listener
41 {
42   /**
43    * Listeners are held in a doubly linked list.
44    */
45   struct Listener *next;
46
47   /**
48    * Listeners are held in a doubly linked list.
49    */
50   struct Listener *prev;
51
52   /**
53    * Client that owns the listener.
54    * Only one client may own a listener.
55    */
56   struct GNUNET_SERVER_Client *client;
57
58   /**
59    * Message queue for the client
60    */
61   struct GNUNET_MQ_Handle *client_mq;
62
63   /**
64    * Application ID for the operation, used to distinguish
65    * multiple operations of the same type with the same peer.
66    */
67   struct GNUNET_HashCode app_id;
68
69   /**
70    * The type of the operation.
71    */
72   enum GNUNET_SET_OperationType operation;
73 };
74
75
76 struct LazyCopyRequest
77 {
78   struct Set *source_set;
79   uint32_t cookie;
80
81   struct LazyCopyRequest *prev;
82   struct LazyCopyRequest *next;
83 };
84
85
86 /**
87  * Configuration of our local peer.
88  */
89 static const struct GNUNET_CONFIGURATION_Handle *configuration;
90
91 /**
92  * Handle to the cadet service, used to listen for and connect to
93  * remote peers.
94  */
95 static struct GNUNET_CADET_Handle *cadet;
96
97 /**
98  * Sets are held in a doubly linked list.
99  */
100 static struct Set *sets_head;
101
102 /**
103  * Sets are held in a doubly linked list.
104  */
105 static struct Set *sets_tail;
106
107 /**
108  * Listeners are held in a doubly linked list.
109  */
110 static struct Listener *listeners_head;
111
112 /**
113  * Listeners are held in a doubly linked list.
114  */
115 static struct Listener *listeners_tail;
116
117 /**
118  * Incoming sockets from remote peers are held in a doubly linked
119  * list.
120  */
121 static struct Operation *incoming_head;
122
123 /**
124  * Incoming sockets from remote peers are held in a doubly linked
125  * list.
126  */
127 static struct Operation *incoming_tail;
128
129 static struct LazyCopyRequest *lazy_copy_head;
130 static struct LazyCopyRequest *lazy_copy_tail;
131
132 static uint32_t lazy_copy_cookie = 1;
133
134 /**
135  * Counter for allocating unique IDs for clients, used to identify
136  * incoming operation requests from remote peers, that the client can
137  * choose to accept or refuse.
138  */
139 static uint32_t suggest_id = 1;
140
141 /**
142  * Statistics handle.
143  */
144 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
145
146
147 /**
148  * Get set that is owned by the given client, if any.
149  *
150  * @param client client to look for
151  * @return set that the client owns, NULL if the client
152  *         does not own a set
153  */
154 static struct Set *
155 set_get (struct GNUNET_SERVER_Client *client)
156 {
157   struct Set *set;
158
159   for (set = sets_head; NULL != set; set = set->next)
160     if (set->client == client)
161       return set;
162   return NULL;
163 }
164
165
166 /**
167  * Get the listener associated with the given client, if any.
168  *
169  * @param client the client
170  * @return listener associated with the client, NULL
171  *         if there isn't any
172  */
173 static struct Listener *
174 listener_get (struct GNUNET_SERVER_Client *client)
175 {
176   struct Listener *listener;
177
178   for (listener = listeners_head; NULL != listener; listener = listener->next)
179     if (listener->client == client)
180       return listener;
181   return NULL;
182 }
183
184
185 /**
186  * Get the incoming socket associated with the given id.
187  *
188  * @param id id to look for
189  * @return the incoming socket associated with the id,
190  *         or NULL if there is none
191  */
192 static struct Operation *
193 get_incoming (uint32_t id)
194 {
195   struct Operation *op;
196
197   for (op = incoming_head; NULL != op; op = op->next)
198     if (op->suggest_id == id)
199     {
200       GNUNET_assert (GNUNET_YES == op->is_incoming);
201       return op;
202     }
203   return NULL;
204 }
205
206
207 /**
208  * Destroy a listener, free all resources associated with it.
209  *
210  * @param listener listener to destroy
211  */
212 static void
213 listener_destroy (struct Listener *listener)
214 {
215   /* If the client is not dead yet, destroy it.
216    * The client's destroy callback will destroy the listener again. */
217   if (NULL != listener->client)
218   {
219     struct GNUNET_SERVER_Client *client = listener->client;
220
221     listener->client = NULL;
222     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
223                 "Disconnecting listener client\n");
224     GNUNET_SERVER_client_disconnect (client);
225     return;
226   }
227   if (NULL != listener->client_mq)
228   {
229     GNUNET_MQ_destroy (listener->client_mq);
230     listener->client_mq = NULL;
231   }
232   GNUNET_CONTAINER_DLL_remove (listeners_head,
233                                listeners_tail,
234                                listener);
235   GNUNET_free (listener);
236 }
237
238
239 /**
240  * Context for the #garbage_collect_cb().
241  */
242 struct GarbageContext
243 {
244
245   /**
246    * Map for which we are garbage collecting removed elements.
247    */
248   struct GNUNET_CONTAINER_MultiHashMap *map;
249
250   /**
251    * Lowest generation for which an operation is still pending.
252    */
253   unsigned int min_op_generation;
254
255   /**
256    * Largest generation for which an operation is still pending.
257    */
258   unsigned int max_op_generation;
259
260 };
261
262
263 /**
264  * Function invoked to check if an element can be removed from
265  * the set's history because it is no longer needed.
266  *
267  * @param cls the `struct GarbageContext *`
268  * @param key key of the element in the map
269  * @param value the `struct ElementEntry *`
270  * @return #GNUNET_OK (continue to iterate)
271  */
272 static int
273 garbage_collect_cb (void *cls,
274                     const struct GNUNET_HashCode *key,
275                     void *value)
276 {
277   //struct GarbageContext *gc = cls;
278   //struct ElementEntry *ee = value;
279
280   //if (GNUNET_YES != ee->removed)
281   //  return GNUNET_OK;
282   //if ( (gc->max_op_generation < ee->generation_added) ||
283   //     (ee->generation_removed > gc->min_op_generation) )
284   //{
285   //  GNUNET_assert (GNUNET_YES ==
286   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
287   //                                                       key,
288   //                                                       ee));
289   //  GNUNET_free (ee);
290   //}
291   return GNUNET_OK;
292 }
293
294
295 /**
296  * Collect and destroy elements that are not needed anymore, because
297  * their lifetime (as determined by their generation) does not overlap
298  * with any active set operation.
299  *
300  * @param set set to garbage collect
301  */
302 static void
303 collect_generation_garbage (struct Set *set)
304 {
305   struct Operation *op;
306   struct GarbageContext gc;
307
308   gc.min_op_generation = UINT_MAX;
309   gc.max_op_generation = 0;
310   for (op = set->ops_head; NULL != op; op = op->next)
311   {
312     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
313                                        op->generation_created);
314     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
315                                        op->generation_created);
316   }
317   gc.map = set->content->elements;
318   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
319                                          &garbage_collect_cb,
320                                          &gc);
321 }
322
323 int
324 is_excluded_generation (unsigned int generation,
325                         struct GenerationRange *excluded,
326                         unsigned int excluded_size)
327 {
328   unsigned int i;
329
330   for (i = 0; i < excluded_size; i++)
331   {
332     if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
333       return GNUNET_YES;
334   }
335
336   return GNUNET_NO;
337 }
338
339
340 int
341 is_element_of_generation (struct ElementEntry *ee,
342                           unsigned int query_generation,
343                           struct GenerationRange *excluded,
344                           unsigned int excluded_size)
345 {
346   struct MutationEvent *mut;
347   int is_present;
348   unsigned int i;
349
350   GNUNET_assert (NULL != ee->mutations);
351
352   if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size))
353   {
354     GNUNET_break (0);
355     return GNUNET_NO;
356   }
357
358   is_present = GNUNET_NO;
359
360   /* Could be made faster with binary search, but lists
361      are small, so why bother. */
362   for (i = 0; i < ee->mutations_size; i++)
363   {
364     mut = &ee->mutations[i];
365
366     if (mut->generation > query_generation)
367     {
368       /* The mutation doesn't apply to our generation
369          anymore.  We can'b break here, since mutations aren't
370          sorted by generation. */
371       continue;
372     }
373
374     if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size))
375     {
376       /* The generation is excluded (because it belongs to another
377          fork via a lazy copy) and thus mutations aren't considered
378          for membership testing. */
379       continue;
380     }
381
382     /* This would be an inconsistency in how we manage mutations. */
383     if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) )
384       GNUNET_assert (0);
385
386     /* Likewise. */
387     if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) )
388       GNUNET_assert (0);
389
390     is_present = mut->added;
391   }
392
393   return is_present;
394 }
395
396
397 int
398 _GSS_is_element_of_set (struct ElementEntry *ee,
399                         struct Set *set)
400 {
401   return is_element_of_generation (ee,
402                                    set->current_generation,
403                                    set->excluded_generations,
404                                    set->excluded_generations_size);
405 }
406
407
408 static int
409 is_element_of_iteration (struct ElementEntry *ee,
410                          struct Set *set)
411 {
412   return is_element_of_generation (ee,
413                                    set->iter_generation,
414                                    set->excluded_generations,
415                                    set->excluded_generations_size);
416 }
417
418
419 int
420 _GSS_is_element_of_operation (struct ElementEntry *ee,
421                               struct Operation *op)
422 {
423   return is_element_of_generation (ee,
424                                    op->generation_created,
425                                    op->spec->set->excluded_generations,
426                                    op->spec->set->excluded_generations_size);
427 }
428
429
430 /**
431  * Destroy the given operation.  Call the implementation-specific
432  * cancel function of the operation.  Disconnects from the remote
433  * peer.  Does not disconnect the client, as there may be multiple
434  * operations per set.
435  *
436  * @param op operation to destroy
437  * @param gc #GNUNET_YES to perform garbage collection on the set
438  */
439 void
440 _GSS_operation_destroy (struct Operation *op,
441                         int gc)
442 {
443   struct Set *set;
444   struct GNUNET_CADET_Channel *channel;
445
446   if (NULL == op->vt)
447   {
448     /* already in #_GSS_operation_destroy() */
449     return;
450   }
451   GNUNET_assert (GNUNET_NO == op->is_incoming);
452   GNUNET_assert (NULL != op->spec);
453   set = op->spec->set;
454   GNUNET_CONTAINER_DLL_remove (set->ops_head,
455                                set->ops_tail,
456                                op);
457   op->vt->cancel (op);
458   op->vt = NULL;
459   if (NULL != op->spec)
460   {
461     if (NULL != op->spec->context_msg)
462     {
463       GNUNET_free (op->spec->context_msg);
464       op->spec->context_msg = NULL;
465     }
466     GNUNET_free (op->spec);
467     op->spec = NULL;
468   }
469   if (NULL != op->mq)
470   {
471     GNUNET_MQ_destroy (op->mq);
472     op->mq = NULL;
473   }
474   if (NULL != (channel = op->channel))
475   {
476     op->channel = NULL;
477     GNUNET_CADET_channel_destroy (channel);
478   }
479   if (GNUNET_YES == gc)
480     collect_generation_garbage (set);
481   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
482    * there was a channel end handler that will free 'op' on the call stack. */
483 }
484
485
486 /**
487  * Iterator over hash map entries to free element entries.
488  *
489  * @param cls closure
490  * @param key current key code
491  * @param value a `struct ElementEntry *` to be free'd
492  * @return #GNUNET_YES (continue to iterate)
493  */
494 static int
495 destroy_elements_iterator (void *cls,
496                            const struct GNUNET_HashCode *key,
497                            void *value)
498 {
499   struct ElementEntry *ee = value;
500
501   GNUNET_free_non_null (ee->mutations);
502
503   GNUNET_free (ee);
504   return GNUNET_YES;
505 }
506
507
508 /**
509  * Destroy a set, and free all resources and operations associated with it.
510  *
511  * @param set the set to destroy
512  */
513 static void
514 set_destroy (struct Set *set)
515 {
516   if (NULL != set->client)
517   {
518     /* If the client is not dead yet, destroy it.  The client's destroy
519      * callback will call `set_destroy()` again in this case.  We do
520      * this so that the channel end handler still has a valid set handle
521      * to destroy. */
522     struct GNUNET_SERVER_Client *client = set->client;
523
524     set->client = NULL;
525     GNUNET_SERVER_client_disconnect (client);
526     return;
527   }
528   GNUNET_assert (NULL != set->state);
529   while (NULL != set->ops_head)
530     _GSS_operation_destroy (set->ops_head, GNUNET_NO);
531   set->vt->destroy_set (set->state);
532   set->state = NULL;
533   if (NULL != set->client_mq)
534   {
535     GNUNET_MQ_destroy (set->client_mq);
536     set->client_mq = NULL;
537   }
538   if (NULL != set->iter)
539   {
540     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
541     set->iter = NULL;
542     set->iteration_id++;
543   }
544   {
545     struct SetContent *content;
546     struct PendingMutation *pm;
547     struct PendingMutation *pm_current;
548
549     content = set->content;
550
551     // discard any pending mutations that reference this set
552     pm = content->pending_mutations_head;
553     while (NULL != pm)
554     {
555       pm_current = pm;
556       pm = pm->next;
557       if (pm_current-> set == set)
558         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
559                                      content->pending_mutations_tail,
560                                      pm_current);
561
562     }
563
564     set->content = NULL;
565     GNUNET_assert (0 != content->refcount);
566     content->refcount -= 1;
567     if (0 == content->refcount)
568     {
569       GNUNET_assert (NULL != content->elements);
570       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
571                                              &destroy_elements_iterator,
572                                              NULL);
573       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
574       content->elements = NULL;
575       GNUNET_free (content);
576     }
577   }
578   GNUNET_free_non_null (set->excluded_generations);
579   set->excluded_generations = NULL;
580   GNUNET_CONTAINER_DLL_remove (sets_head,
581                                sets_tail,
582                                set);
583
584   // remove set from pending copy requests
585   {
586     struct LazyCopyRequest *lcr;
587     lcr = lazy_copy_head;
588     while (NULL != lcr)
589     {
590       struct LazyCopyRequest *lcr_current;
591       lcr_current = lcr;
592       lcr = lcr->next;
593       if (lcr_current->source_set == set)
594         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
595                                      lazy_copy_tail,
596                                      lcr_current);
597     }
598   }
599
600   GNUNET_free (set);
601 }
602
603
604 /**
605  * Clean up after a client has disconnected
606  *
607  * @param cls closure, unused
608  * @param client the client to clean up after
609  */
610 static void
611 handle_client_disconnect (void *cls,
612                           struct GNUNET_SERVER_Client *client)
613 {
614   struct Set *set;
615   struct Listener *listener;
616
617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618               "client disconnected, cleaning up\n");
619   set = set_get (client);
620   if (NULL != set)
621   {
622     set->client = NULL;
623     set_destroy (set);
624     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
625                 "Client's set destroyed\n");
626   }
627   listener = listener_get (client);
628   if (NULL != listener)
629   {
630     listener->client = NULL;
631     listener_destroy (listener);
632     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633                 "Client's listener destroyed\n");
634   }
635 }
636
637
638 /**
639  * Destroy an incoming request from a remote peer
640  *
641  * @param incoming remote request to destroy
642  */
643 static void
644 incoming_destroy (struct Operation *incoming)
645 {
646   struct GNUNET_CADET_Channel *channel;
647
648   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
649   GNUNET_CONTAINER_DLL_remove (incoming_head,
650                                incoming_tail,
651                                incoming);
652   if (NULL != incoming->timeout_task)
653   {
654     GNUNET_SCHEDULER_cancel (incoming->timeout_task);
655     incoming->timeout_task = NULL;
656   }
657   /* make sure that the tunnel end handler will not destroy us again */
658   incoming->vt = NULL;
659   if (NULL != incoming->spec)
660   {
661     GNUNET_free (incoming->spec);
662     incoming->spec = NULL;
663   }
664   if (NULL != incoming->mq)
665   {
666     GNUNET_MQ_destroy (incoming->mq);
667     incoming->mq = NULL;
668   }
669   if (NULL != (channel = incoming->channel))
670   {
671     incoming->channel = NULL;
672     GNUNET_CADET_channel_destroy (channel);
673   }
674 }
675
676
677 /**
678  * Find a listener that is interested in the given operation type
679  * and application id.
680  *
681  * @param op operation type to look for
682  * @param app_id application id to look for
683  * @return a matching listener, or NULL if no listener matches the
684  *         given operation and application id
685  */
686 static struct Listener *
687 listener_get_by_target (enum GNUNET_SET_OperationType op,
688                         const struct GNUNET_HashCode *app_id)
689 {
690   struct Listener *listener;
691
692   for (listener = listeners_head; NULL != listener; listener = listener->next)
693     if ( (listener->operation == op) &&
694          (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) )
695       return listener;
696   return NULL;
697 }
698
699
700 /**
701  * Suggest the given request to the listener. The listening client can
702  * then accept or reject the remote request.
703  *
704  * @param incoming the incoming peer with the request to suggest
705  * @param listener the listener to suggest the request to
706  */
707 static void
708 incoming_suggest (struct Operation *incoming,
709                   struct Listener *listener)
710 {
711   struct GNUNET_MQ_Envelope *mqm;
712   struct GNUNET_SET_RequestMessage *cmsg;
713
714   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
715   GNUNET_assert (NULL != incoming->spec);
716   GNUNET_assert (0 == incoming->suggest_id);
717   incoming->suggest_id = suggest_id++;
718   if (0 == suggest_id)
719     suggest_id++;
720   GNUNET_assert (NULL != incoming->timeout_task);
721   GNUNET_SCHEDULER_cancel (incoming->timeout_task);
722   incoming->timeout_task = NULL;
723   mqm = GNUNET_MQ_msg_nested_mh (cmsg,
724                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
725                                  incoming->spec->context_msg);
726   GNUNET_assert (NULL != mqm);
727   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728               "Suggesting incoming request with accept id %u to listener\n",
729               incoming->suggest_id);
730   cmsg->accept_id = htonl (incoming->suggest_id);
731   cmsg->peer_id = incoming->spec->peer;
732   GNUNET_MQ_send (listener->client_mq, mqm);
733 }
734
735
736 /**
737  * Handle a request for a set operation from another peer.  Checks if we
738  * have a listener waiting for such a request (and in that case initiates
739  * asking the listener about accepting the connection). If no listener
740  * is waiting, we queue the operation request in hope that a listener
741  * shows up soon (before timeout).
742  *
743  * This msg is expected as the first and only msg handled through the
744  * non-operation bound virtual table, acceptance of this operation replaces
745  * our virtual table and subsequent msgs would be routed differently (as
746  * we then know what type of operation this is).
747  *
748  * @param op the operation state
749  * @param mh the received message
750  * @return #GNUNET_OK if the channel should be kept alive,
751  *         #GNUNET_SYSERR to destroy the channel
752  */
753 static int
754 handle_incoming_msg (struct Operation *op,
755                      const struct GNUNET_MessageHeader *mh)
756 {
757   const struct OperationRequestMessage *msg;
758   struct Listener *listener;
759   struct OperationSpecification *spec;
760   const struct GNUNET_MessageHeader *nested_context;
761
762   msg = (const struct OperationRequestMessage *) mh;
763   GNUNET_assert (GNUNET_YES == op->is_incoming);
764   if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
765   {
766     GNUNET_break_op (0);
767     return GNUNET_SYSERR;
768   }
769   /* double operation request */
770   if (NULL != op->spec)
771   {
772     GNUNET_break_op (0);
773     return GNUNET_SYSERR;
774   }
775   spec = GNUNET_new (struct OperationSpecification);
776   nested_context = GNUNET_MQ_extract_nested_mh (msg);
777   if ( (NULL != nested_context) &&
778        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
779   {
780     GNUNET_break_op (0);
781     GNUNET_free (spec);
782     return GNUNET_SYSERR;
783   }
784   /* Make a copy of the nested_context (application-specific context
785      information that is opaque to set) so we can pass it to the
786      listener later on */
787   if (NULL != nested_context)
788     spec->context_msg = GNUNET_copy_message (nested_context);
789   spec->operation = ntohl (msg->operation);
790   spec->app_id = msg->app_id;
791   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
792                                          UINT32_MAX);
793   spec->peer = op->peer;
794   spec->remote_element_count = ntohl (msg->element_count);
795   op->spec = spec;
796
797   listener = listener_get_by_target (ntohl (msg->operation),
798                                      &msg->app_id);
799   if (NULL == listener)
800   {
801     GNUNET_break (NULL != op->timeout_task);
802     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
803                 "No matching listener for incoming request (op %u, app %s), waiting with timeout\n",
804                 ntohl (msg->operation),
805                 GNUNET_h2s (&msg->app_id));
806     return GNUNET_OK;
807   }
808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809               "Received P2P operation request (op %u, app %s) for active listener\n",
810               ntohl (msg->operation),
811               GNUNET_h2s (&msg->app_id));
812   incoming_suggest (op, listener);
813   return GNUNET_OK;
814 }
815
816
817 static void
818 execute_add (struct Set *set,
819              const struct GNUNET_MessageHeader *m)
820 {
821   const struct GNUNET_SET_ElementMessage *msg;
822   struct GNUNET_SET_Element el;
823   struct ElementEntry *ee;
824   struct GNUNET_HashCode hash;
825
826   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type));
827
828   msg = (const struct GNUNET_SET_ElementMessage *) m;
829   el.size = ntohs (m->size) - sizeof *msg;
830   el.data = &msg[1];
831   el.element_type = ntohs (msg->element_type);
832   GNUNET_SET_element_hash (&el, &hash);
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Client inserts element %s of size %u\n",
835               GNUNET_h2s (&hash),
836               el.size);
837
838   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
839                                           &hash);
840
841   if (NULL == ee)
842   {
843     ee = GNUNET_malloc (el.size + sizeof *ee);
844     ee->element.size = el.size;
845     GNUNET_memcpy (&ee[1],
846             el.data,
847             el.size);
848     ee->element.data = &ee[1];
849     ee->element.element_type = el.element_type;
850     ee->remote = GNUNET_NO;
851     ee->mutations = NULL;
852     ee->mutations_size = 0;
853     ee->element_hash = hash;
854     GNUNET_break (GNUNET_YES ==
855                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
856                                                      &ee->element_hash,
857                                                      ee,
858                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
859   }
860   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
861   {
862     /* same element inserted twice */
863     return;
864   }
865
866   {
867     struct MutationEvent mut = {
868       .generation = set->current_generation,
869       .added = GNUNET_YES
870     };
871     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
872   }
873
874   set->vt->add (set->state, ee);
875 }
876
877
878 static void
879 execute_remove (struct Set *set,
880                 const struct GNUNET_MessageHeader *m)
881 {
882   const struct GNUNET_SET_ElementMessage *msg;
883   struct GNUNET_SET_Element el;
884   struct ElementEntry *ee;
885   struct GNUNET_HashCode hash;
886
887   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type));
888
889   msg = (const struct GNUNET_SET_ElementMessage *) m;
890   el.size = ntohs (m->size) - sizeof *msg;
891   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892               "Client removes element of size %u\n",
893               el.size);
894   el.data = &msg[1];
895   el.element_type = ntohs (msg->element_type);
896   GNUNET_SET_element_hash (&el, &hash);
897   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
898                                           &hash);
899   if (NULL == ee)
900   {
901     /* Client tried to remove non-existing element. */
902     return;
903   }
904   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
905   {
906     /* Client tried to remove element twice */
907     return;
908   }
909   else
910   {
911     struct MutationEvent mut = {
912       .generation = set->current_generation,
913       .added = GNUNET_NO
914     };
915     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
916   }
917   set->vt->remove (set->state, ee);
918 }
919
920
921
922 static void
923 execute_mutation (struct Set *set,
924                   const struct GNUNET_MessageHeader *m)
925 {
926   switch (ntohs (m->type))
927   {
928     case GNUNET_MESSAGE_TYPE_SET_ADD:
929       execute_add (set, m);
930       break;
931     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
932       execute_remove (set, m);
933       break;
934     default:
935       GNUNET_break (0);
936   }
937 }
938
939
940
941 /**
942  * Send the next element of a set to the set's client.  The next element is given by
943  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
944  * are no more elements in the set.  The caller must ensure that the set's iterator is
945  * valid.
946  *
947  * The client will acknowledge each received element with a
948  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
949  * #handle_client_iter_ack() will then trigger the next transmission.
950  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
951  *
952  * @param set set that should send its next element to its client
953  */
954 static void
955 send_client_element (struct Set *set)
956 {
957   int ret;
958   struct ElementEntry *ee;
959   struct GNUNET_MQ_Envelope *ev;
960   struct GNUNET_SET_IterResponseMessage *msg;
961
962   GNUNET_assert (NULL != set->iter);
963
964 again:
965
966   ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
967                                                      NULL,
968                                                      (const void **) &ee);
969   if (GNUNET_NO == ret)
970   {
971     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972                 "Iteration on %p done.\n",
973                 (void *) set);
974     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
975     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
976     set->iter = NULL;
977     set->iteration_id++;
978
979     GNUNET_assert (set->content->iterator_count > 0);
980     set->content->iterator_count -= 1;
981
982     if (0 == set->content->iterator_count)
983     {
984       while (NULL != set->content->pending_mutations_head)
985       {
986         struct PendingMutation *pm;
987
988         pm = set->content->pending_mutations_head;
989         GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
990                                      set->content->pending_mutations_tail,
991                                      pm);
992         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993                     "Executing pending mutation on %p.\n",
994                     (void *) pm->set);
995         execute_mutation (pm->set, pm->mutation_message);
996         GNUNET_free (pm->mutation_message);
997         GNUNET_free (pm);
998       }
999     }
1000
1001   }
1002   else
1003   {
1004     GNUNET_assert (NULL != ee);
1005
1006     if (GNUNET_NO == is_element_of_iteration (ee, set))
1007       goto again;
1008
1009     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                 "Sending iteration element on %p.\n",
1011                 (void *) set);
1012     ev = GNUNET_MQ_msg_extra (msg,
1013                               ee->element.size,
1014                               GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1015     GNUNET_memcpy (&msg[1],
1016             ee->element.data,
1017             ee->element.size);
1018     msg->element_type = htons (ee->element.element_type);
1019     msg->iteration_id = htons (set->iteration_id);
1020   }
1021   GNUNET_MQ_send (set->client_mq, ev);
1022 }
1023
1024
1025 /**
1026  * Called when a client wants to iterate the elements of a set.
1027  * Checks if we have a set associated with the client and if we
1028  * can right now start an iteration. If all checks out, starts
1029  * sending the elements of the set to the client.
1030  *
1031  * @param cls unused
1032  * @param client client that sent the message
1033  * @param m message sent by the client
1034  */
1035 static void
1036 handle_client_iterate (void *cls,
1037                        struct GNUNET_SERVER_Client *client,
1038                        const struct GNUNET_MessageHeader *m)
1039 {
1040   struct Set *set;
1041
1042   set = set_get (client);
1043   if (NULL == set)
1044   {
1045     /* attempt to iterate over a non existing set */
1046     GNUNET_break (0);
1047     GNUNET_SERVER_client_disconnect (client);
1048     return;
1049   }
1050   if (NULL != set->iter)
1051   {
1052     /* Only one concurrent iterate-action allowed per set */
1053     GNUNET_break (0);
1054     GNUNET_SERVER_client_disconnect (client);
1055     return;
1056   }
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "Iterating set %p in gen %u with %u content elements\n",
1059               (void *) set,
1060               set->current_generation,
1061               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1062   GNUNET_SERVER_receive_done (client,
1063                               GNUNET_OK);
1064   set->content->iterator_count += 1;
1065   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1066   set->iter_generation = set->current_generation;
1067   send_client_element (set);
1068 }
1069
1070
1071 /**
1072  * Called when a client wants to create a new set.  This is typically
1073  * the first request from a client, and includes the type of set
1074  * operation to be performed.
1075  *
1076  * @param cls unused
1077  * @param client client that sent the message
1078  * @param m message sent by the client
1079  */
1080 static void
1081 handle_client_create_set (void *cls,
1082                           struct GNUNET_SERVER_Client *client,
1083                           const struct GNUNET_MessageHeader *m)
1084 {
1085   const struct GNUNET_SET_CreateMessage *msg;
1086   struct Set *set;
1087
1088   msg = (const struct GNUNET_SET_CreateMessage *) m;
1089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090               "Client created new set (operation %u)\n",
1091               ntohl (msg->operation));
1092   if (NULL != set_get (client))
1093   {
1094     /* There can only be one set per client */
1095     GNUNET_break (0);
1096     GNUNET_SERVER_client_disconnect (client);
1097     return;
1098   }
1099   set = GNUNET_new (struct Set);
1100   switch (ntohl (msg->operation))
1101   {
1102   case GNUNET_SET_OPERATION_INTERSECTION:
1103     set->vt = _GSS_intersection_vt ();
1104     break;
1105   case GNUNET_SET_OPERATION_UNION:
1106     set->vt = _GSS_union_vt ();
1107     break;
1108   default:
1109     GNUNET_free (set);
1110     GNUNET_break (0);
1111     GNUNET_SERVER_client_disconnect (client);
1112     return;
1113   }
1114   set->operation = ntohl (msg->operation);
1115   set->state = set->vt->create ();
1116   if (NULL == set->state)
1117   {
1118     /* initialization failed (i.e. out of memory) */
1119     GNUNET_free (set);
1120     GNUNET_SERVER_client_disconnect (client);
1121     return;
1122   }
1123   set->content = GNUNET_new (struct SetContent);
1124   set->content->refcount = 1;
1125   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1126   set->client = client;
1127   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
1128   GNUNET_CONTAINER_DLL_insert (sets_head,
1129                                sets_tail,
1130                                set);
1131   GNUNET_SERVER_receive_done (client,
1132                               GNUNET_OK);
1133 }
1134
1135
1136 /**
1137  * Called when a client wants to create a new listener.
1138  *
1139  * @param cls unused
1140  * @param client client that sent the message
1141  * @param m message sent by the client
1142  */
1143 static void
1144 handle_client_listen (void *cls,
1145                       struct GNUNET_SERVER_Client *client,
1146                       const struct GNUNET_MessageHeader *m)
1147 {
1148   const struct GNUNET_SET_ListenMessage *msg;
1149   struct Listener *listener;
1150   struct Operation *op;
1151
1152   msg = (const struct GNUNET_SET_ListenMessage *) m;
1153   if (NULL != listener_get (client))
1154   {
1155     /* max. one active listener per client! */
1156     GNUNET_break (0);
1157     GNUNET_SERVER_client_disconnect (client);
1158     return;
1159   }
1160   listener = GNUNET_new (struct Listener);
1161   listener->client = client;
1162   listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
1163   listener->app_id = msg->app_id;
1164   listener->operation = ntohl (msg->operation);
1165   GNUNET_CONTAINER_DLL_insert_tail (listeners_head,
1166                                     listeners_tail,
1167                                     listener);
1168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1169               "New listener created (op %u, app %s)\n",
1170               listener->operation,
1171               GNUNET_h2s (&listener->app_id));
1172
1173   /* check for existing incoming requests the listener might be interested in */
1174   for (op = incoming_head; NULL != op; op = op->next)
1175   {
1176     if (NULL == op->spec)
1177       continue; /* no details available yet */
1178     if (0 != op->suggest_id)
1179       continue; /* this one has been already suggested to a listener */
1180     if (listener->operation != op->spec->operation)
1181       continue; /* incompatible operation */
1182     if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1183                                      &op->spec->app_id))
1184       continue; /* incompatible appliation */
1185     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                 "Found matching existing request\n");
1187     incoming_suggest (op,
1188                       listener);
1189   }
1190   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1191 }
1192
1193
1194 /**
1195  * Called when the listening client rejects an operation
1196  * request by another peer.
1197  *
1198  * @param cls unused
1199  * @param client client that sent the message
1200  * @param m message sent by the client
1201  */
1202 static void
1203 handle_client_reject (void *cls,
1204                       struct GNUNET_SERVER_Client *client,
1205                       const struct GNUNET_MessageHeader *m)
1206 {
1207   struct Operation *incoming;
1208   const struct GNUNET_SET_RejectMessage *msg;
1209
1210   msg = (const struct GNUNET_SET_RejectMessage *) m;
1211   incoming = get_incoming (ntohl (msg->accept_reject_id));
1212   if (NULL == incoming)
1213   {
1214     /* no matching incoming operation for this reject */
1215     GNUNET_break (0);
1216     GNUNET_SERVER_receive_done (client,
1217                                 GNUNET_SYSERR);
1218     return;
1219   }
1220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221               "Peer request (op %u, app %s) rejected by client\n",
1222               incoming->spec->operation,
1223               GNUNET_h2s (&incoming->spec->app_id));
1224   GNUNET_CADET_channel_destroy (incoming->channel);
1225   GNUNET_SERVER_receive_done (client,
1226                               GNUNET_OK);
1227 }
1228
1229
1230
1231 /**
1232  * Called when a client wants to add or remove an element to a set it inhabits.
1233  *
1234  * @param cls unused
1235  * @param client client that sent the message
1236  * @param m message sent by the client
1237  */
1238 static void
1239 handle_client_mutation (void *cls,
1240                         struct GNUNET_SERVER_Client *client,
1241                         const struct GNUNET_MessageHeader *m)
1242 {
1243   struct Set *set;
1244
1245   set = set_get (client);
1246   if (NULL == set)
1247   {
1248     /* client without a set requested an operation */
1249     GNUNET_break (0);
1250     GNUNET_SERVER_client_disconnect (client);
1251     return;
1252   }
1253
1254   GNUNET_SERVER_receive_done (client,
1255                               GNUNET_OK);
1256
1257   if (0 != set->content->iterator_count)
1258   {
1259     struct PendingMutation *pm;
1260
1261     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262                 "Scheduling mutation on set\n");
1263
1264     pm = GNUNET_new (struct PendingMutation);
1265     pm->mutation_message = GNUNET_copy_message (m);
1266     pm->set = set;
1267     GNUNET_CONTAINER_DLL_insert (set->content->pending_mutations_head,
1268                                  set->content->pending_mutations_tail,
1269                                  pm);
1270     return;
1271   }
1272
1273   execute_mutation (set, m);
1274 }
1275
1276
1277 /**
1278  * Advance the current generation of a set,
1279  * adding exclusion ranges if necessary.
1280  *
1281  * @param set the set where we want to advance the generation
1282  */
1283 static void
1284 advance_generation (struct Set *set)
1285 {
1286   struct GenerationRange r;
1287
1288   if (set->current_generation == set->content->latest_generation)
1289   {
1290     set->content->latest_generation += 1;
1291     set->current_generation += 1;
1292     return;
1293   }
1294
1295   GNUNET_assert (set->current_generation < set->content->latest_generation);
1296
1297   r.start = set->current_generation + 1;
1298   r.end = set->content->latest_generation + 1;
1299
1300   set->content->latest_generation = r.end;
1301   set->current_generation = r.end;
1302
1303   GNUNET_array_append (set->excluded_generations,
1304                        set->excluded_generations_size,
1305                        r);
1306 }
1307
1308 /**
1309  * Called when a client wants to initiate a set operation with another
1310  * peer.  Initiates the CADET connection to the listener and sends the
1311  * request.
1312  *
1313  * @param cls unused
1314  * @param client client that sent the message
1315  * @param m message sent by the client
1316  */
1317 static void
1318 handle_client_evaluate (void *cls,
1319                         struct GNUNET_SERVER_Client *client,
1320                         const struct GNUNET_MessageHeader *m)
1321 {
1322   struct Set *set;
1323   const struct GNUNET_SET_EvaluateMessage *msg;
1324   struct OperationSpecification *spec;
1325   struct Operation *op;
1326   const struct GNUNET_MessageHeader *context;
1327
1328   set = set_get (client);
1329   if (NULL == set)
1330   {
1331     GNUNET_break (0);
1332     GNUNET_SERVER_client_disconnect (client);
1333     return;
1334   }
1335   msg = (const struct GNUNET_SET_EvaluateMessage *) m;
1336   spec = GNUNET_new (struct OperationSpecification);
1337   spec->operation = set->operation;
1338   spec->app_id = msg->app_id;
1339   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1340                                          UINT32_MAX);
1341   spec->peer = msg->target_peer;
1342   spec->set = set;
1343   spec->result_mode = ntohl (msg->result_mode);
1344   spec->client_request_id = ntohl (msg->request_id);
1345   context = GNUNET_MQ_extract_nested_mh (msg);
1346   op = GNUNET_new (struct Operation);
1347   op->spec = spec;
1348
1349   // Advance generation values, so that
1350   // mutations won't interfer with the running operation.
1351   op->generation_created = set->current_generation;
1352   advance_generation (set);
1353
1354   op->vt = set->vt;
1355   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1356                                set->ops_tail,
1357                                op);
1358   op->channel = GNUNET_CADET_channel_create (cadet,
1359                                              op,
1360                                              &msg->target_peer,
1361                                              GC_u2h (GNUNET_APPLICATION_TYPE_SET),
1362                                              GNUNET_CADET_OPTION_RELIABLE);
1363   op->mq = GNUNET_CADET_mq_create (op->channel);
1364   set->vt->evaluate (op,
1365                      context);
1366   GNUNET_SERVER_receive_done (client,
1367                               GNUNET_OK);
1368 }
1369
1370
1371 /**
1372  * Handle an ack from a client, and send the next element. Note
1373  * that we only expect acks for set elements, not after the
1374  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1375  *
1376  * @param cls unused
1377  * @param client the client
1378  * @param m the message
1379  */
1380 static void
1381 handle_client_iter_ack (void *cls,
1382                         struct GNUNET_SERVER_Client *client,
1383                         const struct GNUNET_MessageHeader *m)
1384 {
1385   const struct GNUNET_SET_IterAckMessage *ack;
1386   struct Set *set;
1387
1388   set = set_get (client);
1389   if (NULL == set)
1390   {
1391     /* client without a set acknowledged receiving a value */
1392     GNUNET_break (0);
1393     GNUNET_SERVER_client_disconnect (client);
1394     return;
1395   }
1396   if (NULL == set->iter)
1397   {
1398     /* client sent an ack, but we were not expecting one (as
1399        set iteration has finished) */
1400     GNUNET_break (0);
1401     GNUNET_SERVER_client_disconnect (client);
1402     return;
1403   }
1404   ack = (const struct GNUNET_SET_IterAckMessage *) m;
1405   GNUNET_SERVER_receive_done (client,
1406                               GNUNET_OK);
1407   if (ntohl (ack->send_more))
1408   {
1409     send_client_element (set);
1410   }
1411   else
1412   {
1413     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1414     set->iter = NULL;
1415     set->iteration_id++;
1416   }
1417 }
1418
1419
1420 /**
1421  * Handle a request from the client to
1422  * copy a set.
1423  *
1424  * @param cls unused
1425  * @param client the client
1426  * @param mh the message
1427  */
1428 static void
1429 handle_client_copy_lazy_prepare (void *cls,
1430                                  struct GNUNET_SERVER_Client *client,
1431                                  const struct GNUNET_MessageHeader *mh)
1432 {
1433   struct Set *set;
1434   struct LazyCopyRequest *cr;
1435   struct GNUNET_MQ_Envelope *ev;
1436   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1437
1438   set = set_get (client);
1439   if (NULL == set)
1440   {
1441     /* client without a set requested an operation */
1442     GNUNET_break (0);
1443     GNUNET_SERVER_client_disconnect (client);
1444     return;
1445   }
1446
1447   cr = GNUNET_new (struct LazyCopyRequest);
1448
1449   cr->cookie = lazy_copy_cookie;
1450   lazy_copy_cookie += 1;
1451   cr->source_set = set;
1452
1453   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1454                                lazy_copy_tail,
1455                                cr);
1456
1457
1458   ev = GNUNET_MQ_msg (resp_msg,
1459                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1460   resp_msg->cookie = cr->cookie;
1461   GNUNET_MQ_send (set->client_mq, ev);
1462
1463
1464   GNUNET_SERVER_receive_done (client,
1465                               GNUNET_OK);
1466
1467   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1468               "Client requested lazy copy\n");
1469 }
1470
1471
1472 /**
1473  * Handle a request from the client to
1474  * connect to a copy of a set.
1475  *
1476  * @param cls unused
1477  * @param client the client
1478  * @param mh the message
1479  */
1480 static void
1481 handle_client_copy_lazy_connect (void *cls,
1482                                  struct GNUNET_SERVER_Client *client,
1483                                  const struct GNUNET_MessageHeader *mh)
1484 {
1485   struct LazyCopyRequest *cr;
1486   const struct GNUNET_SET_CopyLazyConnectMessage *msg =
1487       (const struct GNUNET_SET_CopyLazyConnectMessage *) mh;
1488   struct Set *set;
1489   int found;
1490
1491   if (NULL != set_get (client))
1492   {
1493     /* There can only be one set per client */
1494     GNUNET_break (0);
1495     GNUNET_SERVER_client_disconnect (client);
1496     return;
1497   }
1498
1499   found = GNUNET_NO;
1500
1501   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1502   {
1503     if (cr->cookie == msg->cookie)
1504     {
1505       found = GNUNET_YES;
1506       break;
1507     }
1508   }
1509
1510   if (GNUNET_NO == found)
1511   {
1512     /* client asked for copy with cookie we don't know */
1513     GNUNET_break (0);
1514     GNUNET_SERVER_client_disconnect (client);
1515     return;
1516   }
1517
1518   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1519                                lazy_copy_tail,
1520                                cr);
1521
1522   set = GNUNET_new (struct Set);
1523
1524   switch (cr->source_set->operation)
1525   {
1526   case GNUNET_SET_OPERATION_INTERSECTION:
1527     set->vt = _GSS_intersection_vt ();
1528     break;
1529   case GNUNET_SET_OPERATION_UNION:
1530     set->vt = _GSS_union_vt ();
1531     break;
1532   default:
1533     GNUNET_assert (0);
1534     return;
1535   }
1536
1537   if (NULL == set->vt->copy_state) {
1538     /* Lazy copy not supported for this set operation */
1539     GNUNET_break (0);
1540     GNUNET_free (set);
1541     GNUNET_free (cr);
1542     GNUNET_SERVER_client_disconnect (client);
1543     return;
1544   }
1545
1546   set->operation = cr->source_set->operation;
1547   set->state = set->vt->copy_state (cr->source_set);
1548   set->content = cr->source_set->content;
1549   set->content->refcount += 1;
1550
1551   set->current_generation = cr->source_set->current_generation;
1552   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1553   set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations,
1554                                              set->excluded_generations_size * sizeof (struct GenerationRange));
1555
1556   /* Advance the generation of the new set, so that mutations to the
1557      of the cloned set and the source set are independent. */
1558   advance_generation (set);
1559
1560
1561   set->client = client;
1562   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
1563   GNUNET_CONTAINER_DLL_insert (sets_head,
1564                                sets_tail,
1565                                set);
1566
1567   GNUNET_free (cr);
1568
1569   GNUNET_SERVER_receive_done (client,
1570                               GNUNET_OK);
1571
1572   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1573               "Client connected to lazy set\n");
1574 }
1575
1576
1577 /**
1578  * Handle a request from the client to
1579  * cancel a running set operation.
1580  *
1581  * @param cls unused
1582  * @param client the client
1583  * @param mh the message
1584  */
1585 static void
1586 handle_client_cancel (void *cls,
1587                       struct GNUNET_SERVER_Client *client,
1588                       const struct GNUNET_MessageHeader *mh)
1589 {
1590   const struct GNUNET_SET_CancelMessage *msg =
1591       (const struct GNUNET_SET_CancelMessage *) mh;
1592   struct Set *set;
1593   struct Operation *op;
1594   int found;
1595
1596   set = set_get (client);
1597   if (NULL == set)
1598   {
1599     /* client without a set requested an operation */
1600     GNUNET_break (0);
1601     GNUNET_SERVER_client_disconnect (client);
1602     return;
1603   }
1604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1605               "Client requested cancel for op %u\n",
1606               ntohl (msg->request_id));
1607   found = GNUNET_NO;
1608   for (op = set->ops_head; NULL != op; op = op->next)
1609   {
1610     if (op->spec->client_request_id == ntohl (msg->request_id))
1611     {
1612       found = GNUNET_YES;
1613       break;
1614     }
1615   }
1616   if (GNUNET_NO == found)
1617   {
1618     /* It may happen that the operation was already destroyed due to
1619      * the other peer disconnecting.  The client may not know about this
1620      * yet and try to cancel the (just barely non-existent) operation.
1621      * So this is not a hard error.
1622      */
1623     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1624                 "Client canceled non-existent op\n");
1625   }
1626   else
1627   {
1628     _GSS_operation_destroy (op,
1629                             GNUNET_YES);
1630   }
1631   GNUNET_SERVER_receive_done (client,
1632                               GNUNET_OK);
1633 }
1634
1635
1636 /**
1637  * Handle a request from the client to accept a set operation that
1638  * came from a remote peer.  We forward the accept to the associated
1639  * operation for handling
1640  *
1641  * @param cls unused
1642  * @param client the client
1643  * @param mh the message
1644  */
1645 static void
1646 handle_client_accept (void *cls,
1647                       struct GNUNET_SERVER_Client *client,
1648                       const struct GNUNET_MessageHeader *mh)
1649 {
1650   struct Set *set;
1651   const struct GNUNET_SET_AcceptMessage *msg;
1652   struct Operation *op;
1653   struct GNUNET_SET_ResultMessage *result_message;
1654   struct GNUNET_MQ_Envelope *ev;
1655
1656   msg = (const struct GNUNET_SET_AcceptMessage *) mh;
1657   set = set_get (client);
1658   if (NULL == set)
1659   {
1660     /* client without a set requested to accept */
1661     GNUNET_break (0);
1662     GNUNET_SERVER_client_disconnect (client);
1663     return;
1664   }
1665   op = get_incoming (ntohl (msg->accept_reject_id));
1666   if (NULL == op)
1667   {
1668     /* It is not an error if the set op does not exist -- it may
1669      * have been destroyed when the partner peer disconnected. */
1670     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1671                 "Client accepted request that is no longer active\n");
1672     ev = GNUNET_MQ_msg (result_message,
1673                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1674     result_message->request_id = msg->request_id;
1675     result_message->element_type = 0;
1676     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1677     GNUNET_MQ_send (set->client_mq, ev);
1678     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1679     return;
1680   }
1681
1682   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683               "Client accepting request %u\n",
1684               ntohl (msg->accept_reject_id));
1685   GNUNET_assert (GNUNET_YES == op->is_incoming);
1686   op->is_incoming = GNUNET_NO;
1687   GNUNET_CONTAINER_DLL_remove (incoming_head,
1688                                incoming_tail,
1689                                op);
1690   op->spec->set = set;
1691   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1692                                set->ops_tail,
1693                                op);
1694   op->spec->client_request_id = ntohl (msg->request_id);
1695   op->spec->result_mode = ntohl (msg->result_mode);
1696
1697   // Advance generation values, so that
1698   // mutations won't interfer with the running operation.
1699   op->generation_created = set->current_generation;
1700   advance_generation (set);
1701
1702   op->vt = set->vt;
1703   op->vt->accept (op);
1704   GNUNET_SERVER_receive_done (client,
1705                               GNUNET_OK);
1706 }
1707
1708
1709 /**
1710  * Called to clean up, after a shutdown has been requested.
1711  *
1712  * @param cls closure
1713  */
1714 static void
1715 shutdown_task (void *cls)
1716 {
1717   while (NULL != incoming_head)
1718     incoming_destroy (incoming_head);
1719   while (NULL != listeners_head)
1720     listener_destroy (listeners_head);
1721   while (NULL != sets_head)
1722     set_destroy (sets_head);
1723
1724   /* it's important to destroy cadet at the end, as all channels
1725    * must be destroyed before the cadet handle! */
1726   if (NULL != cadet)
1727   {
1728     GNUNET_CADET_disconnect (cadet);
1729     cadet = NULL;
1730   }
1731   GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
1732   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1733               "handled shutdown request\n");
1734 }
1735
1736
1737 /**
1738  * Timeout happens iff:
1739  *  - we suggested an operation to our listener,
1740  *    but did not receive a response in time
1741  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1742  *
1743  * @param cls channel context
1744  * @param tc context information (why was this task triggered now)
1745  */
1746 static void
1747 incoming_timeout_cb (void *cls)
1748 {
1749   struct Operation *incoming = cls;
1750
1751   incoming->timeout_task = NULL;
1752   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754               "Remote peer's incoming request timed out\n");
1755   incoming_destroy (incoming);
1756 }
1757
1758
1759 /**
1760  * Terminates an incoming operation in case we have not yet received an
1761  * operation request. Called by the channel destruction handler.
1762  *
1763  * @param op the channel context
1764  */
1765 static void
1766 handle_incoming_disconnect (struct Operation *op)
1767 {
1768   GNUNET_assert (GNUNET_YES == op->is_incoming);
1769   /* channel is already dead, incoming_destroy must not
1770    * destroy it ... */
1771   op->channel = NULL;
1772   incoming_destroy (op);
1773   op->vt = NULL;
1774 }
1775
1776
1777 /**
1778  * Method called whenever another peer has added us to a channel the
1779  * other peer initiated.  Only called (once) upon reception of data
1780  * with a message type which was subscribed to in
1781  * GNUNET_CADET_connect().
1782  *
1783  * The channel context represents the operation itself and gets added to a DLL,
1784  * from where it gets looked up when our local listener client responds
1785  * to a proposed/suggested operation or connects and associates with this operation.
1786  *
1787  * @param cls closure
1788  * @param channel new handle to the channel
1789  * @param initiator peer that started the channel
1790  * @param port Port this channel is for.
1791  * @param options Unused.
1792  * @return initial channel context for the channel
1793  *         returns NULL on error
1794  */
1795 static void *
1796 channel_new_cb (void *cls,
1797                 struct GNUNET_CADET_Channel *channel,
1798                 const struct GNUNET_PeerIdentity *initiator,
1799                 const struct GNUNET_HashCode *port,
1800                 enum GNUNET_CADET_ChannelOption options)
1801 {
1802   static const struct SetVT incoming_vt = {
1803     .msg_handler = &handle_incoming_msg,
1804     .peer_disconnect = &handle_incoming_disconnect
1805   };
1806   struct Operation *incoming;
1807
1808   if (0 != memcmp (GC_u2h (GNUNET_APPLICATION_TYPE_SET), port, sizeof (*port)))
1809   {
1810     GNUNET_break (0);
1811     GNUNET_CADET_channel_destroy (channel);
1812     return NULL;
1813   }
1814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1815               "New incoming channel\n");
1816   incoming = GNUNET_new (struct Operation);
1817   incoming->is_incoming = GNUNET_YES;
1818   incoming->peer = *initiator;
1819   incoming->channel = channel;
1820   incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
1821   incoming->vt = &incoming_vt;
1822   incoming->timeout_task
1823     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1824                                     &incoming_timeout_cb,
1825                                     incoming);
1826   GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
1827                                     incoming_tail,
1828                                     incoming);
1829   return incoming;
1830 }
1831
1832
1833 /**
1834  * Function called whenever a channel is destroyed.  Should clean up
1835  * any associated state.  It must NOT call
1836  * GNUNET_CADET_channel_destroy() on the channel.
1837  *
1838  * The peer_disconnect function is part of a a virtual table set initially either
1839  * when a peer creates a new channel with us (#channel_new_cb()), or once we create
1840  * a new channel ourselves (evaluate).
1841  *
1842  * Once we know the exact type of operation (union/intersection), the vt is
1843  * replaced with an operation specific instance (_GSS_[op]_vt).
1844  *
1845  * @param cls closure (set from GNUNET_CADET_connect())
1846  * @param channel connection to the other end (henceforth invalid)
1847  * @param channel_ctx place where local state associated
1848  *                   with the channel is stored
1849  */
1850 static void
1851 channel_end_cb (void *cls,
1852                 const struct GNUNET_CADET_Channel *channel,
1853                 void *channel_ctx)
1854 {
1855   struct Operation *op = channel_ctx;
1856
1857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1858               "channel_end_cb called\n");
1859   op->channel = NULL;
1860   op->keep++;
1861   /* the vt can be null if a client already requested canceling op. */
1862   if (NULL != op->vt)
1863   {
1864     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1865                 "calling peer disconnect due to channel end\n");
1866     op->vt->peer_disconnect (op);
1867   }
1868   op->keep--;
1869   if (0 == op->keep)
1870   {
1871     /* cadet will never call us with the context again! */
1872     GNUNET_free (op);
1873   }
1874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875               "channel_end_cb finished\n");
1876 }
1877
1878
1879 /**
1880  * Functions with this signature are called whenever a message is
1881  * received via a cadet channel.
1882  *
1883  * The msg_handler is a virtual table set in initially either when a peer
1884  * creates a new channel with us (channel_new_cb), or once we create a new channel
1885  * ourselves (evaluate).
1886  *
1887  * Once we know the exact type of operation (union/intersection), the vt is
1888  * replaced with an operation specific instance (_GSS_[op]_vt).
1889  *
1890  * @param cls Closure (set from GNUNET_CADET_connect()).
1891  * @param channel Connection to the other end.
1892  * @param channel_ctx Place to store local state associated with the channel.
1893  * @param message The actual message.
1894  * @return #GNUNET_OK to keep the channel open,
1895  *         #GNUNET_SYSERR to close it (signal serious error).
1896  */
1897 static int
1898 dispatch_p2p_message (void *cls,
1899                       struct GNUNET_CADET_Channel *channel,
1900                       void **channel_ctx,
1901                       const struct GNUNET_MessageHeader *message)
1902 {
1903   struct Operation *op = *channel_ctx;
1904   int ret;
1905
1906   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1907               "Dispatching cadet message (type: %u)\n",
1908               ntohs (message->type));
1909   /* do this before the handler, as the handler might kill the channel */
1910   GNUNET_CADET_receive_done (channel);
1911   if (NULL != op->vt)
1912     ret = op->vt->msg_handler (op,
1913                                message);
1914   else
1915     ret = GNUNET_SYSERR;
1916   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917               "Handled cadet message (type: %u)\n",
1918               ntohs (message->type));
1919   return ret;
1920 }
1921
1922
1923 /**
1924  * Function called by the service's run
1925  * method to run service-specific setup code.
1926  *
1927  * @param cls closure
1928  * @param server the initialized server
1929  * @param cfg configuration to use
1930  */
1931 static void
1932 run (void *cls,
1933      struct GNUNET_SERVER_Handle *server,
1934      const struct GNUNET_CONFIGURATION_Handle *cfg)
1935 {
1936   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1937     { &handle_client_accept, NULL,
1938       GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1939       sizeof (struct GNUNET_SET_AcceptMessage)},
1940     { &handle_client_iter_ack, NULL,
1941       GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1942       sizeof (struct GNUNET_SET_IterAckMessage) },
1943     { &handle_client_mutation, NULL,
1944       GNUNET_MESSAGE_TYPE_SET_ADD,
1945       0},
1946     { &handle_client_create_set, NULL,
1947       GNUNET_MESSAGE_TYPE_SET_CREATE,
1948       sizeof (struct GNUNET_SET_CreateMessage)},
1949     { &handle_client_iterate, NULL,
1950       GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1951       sizeof (struct GNUNET_MessageHeader)},
1952     { &handle_client_evaluate, NULL,
1953       GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1954       0},
1955     { &handle_client_listen, NULL,
1956       GNUNET_MESSAGE_TYPE_SET_LISTEN,
1957       sizeof (struct GNUNET_SET_ListenMessage)},
1958     { &handle_client_reject, NULL,
1959       GNUNET_MESSAGE_TYPE_SET_REJECT,
1960       sizeof (struct GNUNET_SET_RejectMessage)},
1961     { &handle_client_mutation, NULL,
1962       GNUNET_MESSAGE_TYPE_SET_REMOVE,
1963       0},
1964     { &handle_client_cancel, NULL,
1965       GNUNET_MESSAGE_TYPE_SET_CANCEL,
1966       sizeof (struct GNUNET_SET_CancelMessage)},
1967     { &handle_client_copy_lazy_prepare, NULL,
1968       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1969       sizeof (struct GNUNET_MessageHeader)},
1970     { &handle_client_copy_lazy_connect, NULL,
1971       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1972       sizeof (struct GNUNET_SET_CopyLazyConnectMessage)},
1973     { NULL, NULL, 0, 0}
1974   };
1975   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1976     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1977     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1978     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1979     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0},
1980     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0},
1981     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0},
1982     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1983     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
1984     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1985     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 0},
1986     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
1987     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
1988     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0},
1989     {NULL, 0, 0}
1990   };
1991
1992   configuration = cfg;
1993   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1994   GNUNET_SERVER_disconnect_notify (server,
1995                                    &handle_client_disconnect, NULL);
1996   GNUNET_SERVER_add_handlers (server,
1997                               server_handlers);
1998   _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
1999   cadet = GNUNET_CADET_connect (cfg, NULL,
2000                                 &channel_end_cb,
2001                                 cadet_handlers);
2002   GNUNET_CADET_open_port (cadet,
2003                           GC_u2h (GNUNET_APPLICATION_TYPE_SET),
2004                           &channel_new_cb, NULL);
2005   if (NULL == cadet)
2006   {
2007     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2008                 _("Could not connect to cadet service\n"));
2009     return;
2010   }
2011 }
2012
2013
2014 /**
2015  * The main function for the set service.
2016  *
2017  * @param argc number of arguments from the command line
2018  * @param argv command line arguments
2019  * @return 0 ok, 1 on error
2020  */
2021 int
2022 main (int argc,
2023       char *const *argv)
2024 {
2025   int ret;
2026
2027   ret = GNUNET_SERVICE_run (argc, argv, "set",
2028                             GNUNET_SERVICE_OPTION_NONE,
2029                             &run, NULL);
2030   return (GNUNET_OK == ret) ? 0 : 1;
2031 }
2032
2033 /* end of gnunet-service-set.c */