cfddef6fbf62c3bc4ae2ba7369b2d0a3c01e4a7c
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
29
30 /**
31  * How long do we hold on to an incoming channel if there is
32  * no local listener before giving up?
33  */
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35
36 /**
37  * A listener is inhabited by a client, and waits for evaluation
38  * requests from remote peers.
39  */
40 struct Listener
41 {
42   /**
43    * Listeners are held in a doubly linked list.
44    */
45   struct Listener *next;
46
47   /**
48    * Listeners are held in a doubly linked list.
49    */
50   struct Listener *prev;
51
52   /**
53    * Client that owns the listener.
54    * Only one client may own a listener.
55    */
56   struct GNUNET_SERVER_Client *client;
57
58   /**
59    * Message queue for the client
60    */
61   struct GNUNET_MQ_Handle *client_mq;
62
63   /**
64    * Application ID for the operation, used to distinguish
65    * multiple operations of the same type with the same peer.
66    */
67   struct GNUNET_HashCode app_id;
68
69   /**
70    * The type of the operation.
71    */
72   enum GNUNET_SET_OperationType operation;
73 };
74
75
76 struct LazyCopyRequest
77 {
78   struct Set *source_set;
79   uint32_t cookie;
80
81   struct LazyCopyRequest *prev;
82   struct LazyCopyRequest *next;
83 };
84
85
86 /**
87  * Configuration of our local peer.
88  */
89 static const struct GNUNET_CONFIGURATION_Handle *configuration;
90
91 /**
92  * Handle to the cadet service, used to listen for and connect to
93  * remote peers.
94  */
95 static struct GNUNET_CADET_Handle *cadet;
96
97 /**
98  * Sets are held in a doubly linked list.
99  */
100 static struct Set *sets_head;
101
102 /**
103  * Sets are held in a doubly linked list.
104  */
105 static struct Set *sets_tail;
106
107 /**
108  * Listeners are held in a doubly linked list.
109  */
110 static struct Listener *listeners_head;
111
112 /**
113  * Listeners are held in a doubly linked list.
114  */
115 static struct Listener *listeners_tail;
116
117 /**
118  * Incoming sockets from remote peers are held in a doubly linked
119  * list.
120  */
121 static struct Operation *incoming_head;
122
123 /**
124  * Incoming sockets from remote peers are held in a doubly linked
125  * list.
126  */
127 static struct Operation *incoming_tail;
128
129 static struct LazyCopyRequest *lazy_copy_head;
130 static struct LazyCopyRequest *lazy_copy_tail;
131
132 static uint32_t lazy_copy_cookie = 1;
133
134 /**
135  * Counter for allocating unique IDs for clients, used to identify
136  * incoming operation requests from remote peers, that the client can
137  * choose to accept or refuse.
138  */
139 static uint32_t suggest_id = 1;
140
141 /**
142  * Statistics handle.
143  */
144 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
145
146
147 /**
148  * Get set that is owned by the given client, if any.
149  *
150  * @param client client to look for
151  * @return set that the client owns, NULL if the client
152  *         does not own a set
153  */
154 static struct Set *
155 set_get (struct GNUNET_SERVER_Client *client)
156 {
157   struct Set *set;
158
159   for (set = sets_head; NULL != set; set = set->next)
160     if (set->client == client)
161       return set;
162   return NULL;
163 }
164
165
166 /**
167  * Get the listener associated with the given client, if any.
168  *
169  * @param client the client
170  * @return listener associated with the client, NULL
171  *         if there isn't any
172  */
173 static struct Listener *
174 listener_get (struct GNUNET_SERVER_Client *client)
175 {
176   struct Listener *listener;
177
178   for (listener = listeners_head; NULL != listener; listener = listener->next)
179     if (listener->client == client)
180       return listener;
181   return NULL;
182 }
183
184
185 /**
186  * Get the incoming socket associated with the given id.
187  *
188  * @param id id to look for
189  * @return the incoming socket associated with the id,
190  *         or NULL if there is none
191  */
192 static struct Operation *
193 get_incoming (uint32_t id)
194 {
195   struct Operation *op;
196
197   for (op = incoming_head; NULL != op; op = op->next)
198     if (op->suggest_id == id)
199     {
200       GNUNET_assert (GNUNET_YES == op->is_incoming);
201       return op;
202     }
203   return NULL;
204 }
205
206
207 /**
208  * Destroy a listener, free all resources associated with it.
209  *
210  * @param listener listener to destroy
211  */
212 static void
213 listener_destroy (struct Listener *listener)
214 {
215   /* If the client is not dead yet, destroy it.
216    * The client's destroy callback will destroy the listener again. */
217   if (NULL != listener->client)
218   {
219     struct GNUNET_SERVER_Client *client = listener->client;
220
221     listener->client = NULL;
222     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
223                 "Disconnecting listener client\n");
224     GNUNET_SERVER_client_disconnect (client);
225     return;
226   }
227   if (NULL != listener->client_mq)
228   {
229     GNUNET_MQ_destroy (listener->client_mq);
230     listener->client_mq = NULL;
231   }
232   GNUNET_CONTAINER_DLL_remove (listeners_head,
233                                listeners_tail,
234                                listener);
235   GNUNET_free (listener);
236 }
237
238
239 /**
240  * Context for the #garbage_collect_cb().
241  */
242 struct GarbageContext
243 {
244
245   /**
246    * Map for which we are garbage collecting removed elements.
247    */
248   struct GNUNET_CONTAINER_MultiHashMap *map;
249
250   /**
251    * Lowest generation for which an operation is still pending.
252    */
253   unsigned int min_op_generation;
254
255   /**
256    * Largest generation for which an operation is still pending.
257    */
258   unsigned int max_op_generation;
259
260 };
261
262
263 /**
264  * Function invoked to check if an element can be removed from
265  * the set's history because it is no longer needed.
266  *
267  * @param cls the `struct GarbageContext *`
268  * @param key key of the element in the map
269  * @param value the `struct ElementEntry *`
270  * @return #GNUNET_OK (continue to iterate)
271  */
272 static int
273 garbage_collect_cb (void *cls,
274                     const struct GNUNET_HashCode *key,
275                     void *value)
276 {
277   //struct GarbageContext *gc = cls;
278   //struct ElementEntry *ee = value;
279
280   //if (GNUNET_YES != ee->removed)
281   //  return GNUNET_OK;
282   //if ( (gc->max_op_generation < ee->generation_added) ||
283   //     (ee->generation_removed > gc->min_op_generation) )
284   //{
285   //  GNUNET_assert (GNUNET_YES ==
286   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
287   //                                                       key,
288   //                                                       ee));
289   //  GNUNET_free (ee);
290   //}
291   return GNUNET_OK;
292 }
293
294
295 /**
296  * Collect and destroy elements that are not needed anymore, because
297  * their lifetime (as determined by their generation) does not overlap
298  * with any active set operation.
299  *
300  * @param set set to garbage collect
301  */
302 static void
303 collect_generation_garbage (struct Set *set)
304 {
305   struct Operation *op;
306   struct GarbageContext gc;
307
308   gc.min_op_generation = UINT_MAX;
309   gc.max_op_generation = 0;
310   for (op = set->ops_head; NULL != op; op = op->next)
311   {
312     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
313                                        op->generation_created);
314     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
315                                        op->generation_created);
316   }
317   gc.map = set->content->elements;
318   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
319                                          &garbage_collect_cb,
320                                          &gc);
321 }
322
323 int
324 is_excluded_generation (unsigned int generation,
325                         struct GenerationRange *excluded,
326                         unsigned int excluded_size)
327 {
328   unsigned int i;
329
330   for (i = 0; i < excluded_size; i++)
331   {
332     if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
333       return GNUNET_YES;
334   }
335
336   return GNUNET_NO;
337 }
338
339
340 int
341 is_element_of_generation (struct ElementEntry *ee,
342                           unsigned int query_generation,
343                           struct GenerationRange *excluded,
344                           unsigned int excluded_size)
345 {
346   struct MutationEvent *mut;
347   int is_present;
348   unsigned int i;
349
350   GNUNET_assert (NULL != ee->mutations);
351
352   if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size))
353   {
354     GNUNET_break (0);
355     return GNUNET_NO;
356   }
357
358   is_present = GNUNET_NO;
359
360   /* Could be made faster with binary search, but lists
361      are small, so why bother. */
362   for (i = 0; i < ee->mutations_size; i++)
363   {
364     mut = &ee->mutations[i];
365
366     if (mut->generation > query_generation)
367     {
368       /* The mutation doesn't apply to our generation
369          anymore.  We can'b break here, since mutations aren't
370          sorted by generation. */
371       continue;
372     }
373
374     if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size))
375     {
376       /* The generation is excluded (because it belongs to another
377          fork via a lazy copy) and thus mutations aren't considered
378          for membership testing. */
379       continue;
380     }
381
382     /* This would be an inconsistency in how we manage mutations. */
383     if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) )
384       GNUNET_assert (0);
385
386     /* Likewise. */
387     if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) )
388       GNUNET_assert (0);
389
390     is_present = mut->added;
391   }
392
393   return is_present;
394 }
395
396
397 int
398 _GSS_is_element_of_set (struct ElementEntry *ee,
399                         struct Set *set)
400 {
401   return is_element_of_generation (ee,
402                                    set->current_generation,
403                                    set->excluded_generations,
404                                    set->excluded_generations_size);
405 }
406
407
408 static int
409 is_element_of_iteration (struct ElementEntry *ee,
410                          struct Set *set)
411 {
412   return is_element_of_generation (ee,
413                                    set->iter_generation,
414                                    set->excluded_generations,
415                                    set->excluded_generations_size);
416 }
417
418
419 int
420 _GSS_is_element_of_operation (struct ElementEntry *ee,
421                               struct Operation *op)
422 {
423   return is_element_of_generation (ee,
424                                    op->generation_created,
425                                    op->spec->set->excluded_generations,
426                                    op->spec->set->excluded_generations_size);
427 }
428
429
430 /**
431  * Destroy the given operation.  Call the implementation-specific
432  * cancel function of the operation.  Disconnects from the remote
433  * peer.  Does not disconnect the client, as there may be multiple
434  * operations per set.
435  *
436  * @param op operation to destroy
437  * @param gc #GNUNET_YES to perform garbage collection on the set
438  */
439 void
440 _GSS_operation_destroy (struct Operation *op,
441                         int gc)
442 {
443   struct Set *set;
444   struct GNUNET_CADET_Channel *channel;
445
446   if (NULL == op->vt)
447   {
448     /* already in #_GSS_operation_destroy() */
449     return;
450   }
451   GNUNET_assert (GNUNET_NO == op->is_incoming);
452   GNUNET_assert (NULL != op->spec);
453   set = op->spec->set;
454   GNUNET_CONTAINER_DLL_remove (set->ops_head,
455                                set->ops_tail,
456                                op);
457   op->vt->cancel (op);
458   op->vt = NULL;
459   if (NULL != op->spec)
460   {
461     if (NULL != op->spec->context_msg)
462     {
463       GNUNET_free (op->spec->context_msg);
464       op->spec->context_msg = NULL;
465     }
466     GNUNET_free (op->spec);
467     op->spec = NULL;
468   }
469   if (NULL != op->mq)
470   {
471     GNUNET_MQ_destroy (op->mq);
472     op->mq = NULL;
473   }
474   if (NULL != (channel = op->channel))
475   {
476     op->channel = NULL;
477     GNUNET_CADET_channel_destroy (channel);
478   }
479   if (GNUNET_YES == gc)
480     collect_generation_garbage (set);
481   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
482    * there was a channel end handler that will free 'op' on the call stack. */
483 }
484
485
486 /**
487  * Iterator over hash map entries to free element entries.
488  *
489  * @param cls closure
490  * @param key current key code
491  * @param value a `struct ElementEntry *` to be free'd
492  * @return #GNUNET_YES (continue to iterate)
493  */
494 static int
495 destroy_elements_iterator (void *cls,
496                            const struct GNUNET_HashCode *key,
497                            void *value)
498 {
499   struct ElementEntry *ee = value;
500
501   GNUNET_free_non_null (ee->mutations);
502
503   GNUNET_free (ee);
504   return GNUNET_YES;
505 }
506
507
508 /**
509  * Destroy a set, and free all resources and operations associated with it.
510  *
511  * @param set the set to destroy
512  */
513 static void
514 set_destroy (struct Set *set)
515 {
516   if (NULL != set->client)
517   {
518     /* If the client is not dead yet, destroy it.  The client's destroy
519      * callback will call `set_destroy()` again in this case.  We do
520      * this so that the channel end handler still has a valid set handle
521      * to destroy. */
522     struct GNUNET_SERVER_Client *client = set->client;
523
524     set->client = NULL;
525     GNUNET_SERVER_client_disconnect (client);
526     return;
527   }
528   GNUNET_assert (NULL != set->state);
529   while (NULL != set->ops_head)
530     _GSS_operation_destroy (set->ops_head, GNUNET_NO);
531   set->vt->destroy_set (set->state);
532   set->state = NULL;
533   if (NULL != set->client_mq)
534   {
535     GNUNET_MQ_destroy (set->client_mq);
536     set->client_mq = NULL;
537   }
538   if (NULL != set->iter)
539   {
540     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
541     set->iter = NULL;
542     set->iteration_id++;
543   }
544   {
545     struct SetContent *content;
546     struct PendingMutation *pm;
547     struct PendingMutation *pm_current;
548
549     content = set->content;
550
551     // discard any pending mutations that reference this set
552     pm = content->pending_mutations_head;
553     while (NULL != pm)
554     {
555       pm_current = pm;
556       pm = pm->next;
557       if (pm_current-> set == set)
558         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
559                                      content->pending_mutations_tail,
560                                      pm_current);
561
562     }
563
564     set->content = NULL;
565     GNUNET_assert (0 != content->refcount);
566     content->refcount -= 1;
567     if (0 == content->refcount)
568     {
569       GNUNET_assert (NULL != content->elements);
570       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
571                                              &destroy_elements_iterator,
572                                              NULL);
573       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
574       content->elements = NULL;
575       GNUNET_free (content);
576     }
577   }
578   GNUNET_free_non_null (set->excluded_generations);
579   set->excluded_generations = NULL;
580   GNUNET_CONTAINER_DLL_remove (sets_head,
581                                sets_tail,
582                                set);
583
584   // remove set from pending copy requests
585   {
586     struct LazyCopyRequest *lcr;
587     lcr = lazy_copy_head;
588     while (NULL != lcr)
589     {
590       struct LazyCopyRequest *lcr_current;
591       lcr_current = lcr;
592       lcr = lcr->next;
593       if (lcr_current->source_set == set)
594         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
595                                      lazy_copy_tail,
596                                      lcr_current);
597     }
598   }
599
600   GNUNET_free (set);
601 }
602
603
604 /**
605  * Clean up after a client has disconnected
606  *
607  * @param cls closure, unused
608  * @param client the client to clean up after
609  */
610 static void
611 handle_client_disconnect (void *cls,
612                           struct GNUNET_SERVER_Client *client)
613 {
614   struct Set *set;
615   struct Listener *listener;
616
617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618               "client disconnected, cleaning up\n");
619   set = set_get (client);
620   if (NULL != set)
621   {
622     set->client = NULL;
623     set_destroy (set);
624     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
625                 "Client's set destroyed\n");
626   }
627   listener = listener_get (client);
628   if (NULL != listener)
629   {
630     listener->client = NULL;
631     listener_destroy (listener);
632     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633                 "Client's listener destroyed\n");
634   }
635 }
636
637
638 /**
639  * Destroy an incoming request from a remote peer
640  *
641  * @param incoming remote request to destroy
642  */
643 static void
644 incoming_destroy (struct Operation *incoming)
645 {
646   struct GNUNET_CADET_Channel *channel;
647
648   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
649   GNUNET_CONTAINER_DLL_remove (incoming_head,
650                                incoming_tail,
651                                incoming);
652   if (NULL != incoming->timeout_task)
653   {
654     GNUNET_SCHEDULER_cancel (incoming->timeout_task);
655     incoming->timeout_task = NULL;
656   }
657   /* make sure that the tunnel end handler will not destroy us again */
658   incoming->vt = NULL;
659   if (NULL != incoming->spec)
660   {
661     GNUNET_free (incoming->spec);
662     incoming->spec = NULL;
663   }
664   if (NULL != incoming->mq)
665   {
666     GNUNET_MQ_destroy (incoming->mq);
667     incoming->mq = NULL;
668   }
669   if (NULL != (channel = incoming->channel))
670   {
671     incoming->channel = NULL;
672     GNUNET_CADET_channel_destroy (channel);
673   }
674 }
675
676
677 /**
678  * Find a listener that is interested in the given operation type
679  * and application id.
680  *
681  * @param op operation type to look for
682  * @param app_id application id to look for
683  * @return a matching listener, or NULL if no listener matches the
684  *         given operation and application id
685  */
686 static struct Listener *
687 listener_get_by_target (enum GNUNET_SET_OperationType op,
688                         const struct GNUNET_HashCode *app_id)
689 {
690   struct Listener *listener;
691
692   for (listener = listeners_head; NULL != listener; listener = listener->next)
693     if ( (listener->operation == op) &&
694          (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) )
695       return listener;
696   return NULL;
697 }
698
699
700 /**
701  * Suggest the given request to the listener. The listening client can
702  * then accept or reject the remote request.
703  *
704  * @param incoming the incoming peer with the request to suggest
705  * @param listener the listener to suggest the request to
706  */
707 static void
708 incoming_suggest (struct Operation *incoming,
709                   struct Listener *listener)
710 {
711   struct GNUNET_MQ_Envelope *mqm;
712   struct GNUNET_SET_RequestMessage *cmsg;
713
714   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
715   GNUNET_assert (NULL != incoming->spec);
716   GNUNET_assert (0 == incoming->suggest_id);
717   incoming->suggest_id = suggest_id++;
718   if (0 == suggest_id)
719     suggest_id++;
720   GNUNET_assert (NULL != incoming->timeout_task);
721   GNUNET_SCHEDULER_cancel (incoming->timeout_task);
722   incoming->timeout_task = NULL;
723   mqm = GNUNET_MQ_msg_nested_mh (cmsg,
724                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
725                                  incoming->spec->context_msg);
726   GNUNET_assert (NULL != mqm);
727   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728               "Suggesting incoming request with accept id %u to listener\n",
729               incoming->suggest_id);
730   cmsg->accept_id = htonl (incoming->suggest_id);
731   cmsg->peer_id = incoming->spec->peer;
732   GNUNET_MQ_send (listener->client_mq, mqm);
733 }
734
735
736 /**
737  * Handle a request for a set operation from another peer.  Checks if we
738  * have a listener waiting for such a request (and in that case initiates
739  * asking the listener about accepting the connection). If no listener
740  * is waiting, we queue the operation request in hope that a listener
741  * shows up soon (before timeout).
742  *
743  * This msg is expected as the first and only msg handled through the
744  * non-operation bound virtual table, acceptance of this operation replaces
745  * our virtual table and subsequent msgs would be routed differently (as
746  * we then know what type of operation this is).
747  *
748  * @param op the operation state
749  * @param mh the received message
750  * @return #GNUNET_OK if the channel should be kept alive,
751  *         #GNUNET_SYSERR to destroy the channel
752  */
753 static int
754 handle_incoming_msg (struct Operation *op,
755                      const struct GNUNET_MessageHeader *mh)
756 {
757   const struct OperationRequestMessage *msg;
758   struct Listener *listener;
759   struct OperationSpecification *spec;
760   const struct GNUNET_MessageHeader *nested_context;
761
762   msg = (const struct OperationRequestMessage *) mh;
763   GNUNET_assert (GNUNET_YES == op->is_incoming);
764   if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
765   {
766     GNUNET_break_op (0);
767     return GNUNET_SYSERR;
768   }
769   /* double operation request */
770   if (NULL != op->spec)
771   {
772     GNUNET_break_op (0);
773     return GNUNET_SYSERR;
774   }
775   spec = GNUNET_new (struct OperationSpecification);
776   nested_context = GNUNET_MQ_extract_nested_mh (msg);
777   if ( (NULL != nested_context) &&
778        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
779   {
780     GNUNET_break_op (0);
781     GNUNET_free (spec);
782     return GNUNET_SYSERR;
783   }
784   /* Make a copy of the nested_context (application-specific context
785      information that is opaque to set) so we can pass it to the
786      listener later on */
787   if (NULL != nested_context)
788     spec->context_msg = GNUNET_copy_message (nested_context);
789   spec->operation = ntohl (msg->operation);
790   spec->app_id = msg->app_id;
791   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
792                                          UINT32_MAX);
793   spec->peer = op->peer;
794   spec->remote_element_count = ntohl (msg->element_count);
795   op->spec = spec;
796
797   listener = listener_get_by_target (ntohl (msg->operation),
798                                      &msg->app_id);
799   if (NULL == listener)
800   {
801     GNUNET_break (NULL != op->timeout_task);
802     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
803                 "No matching listener for incoming request (op %u, app %s), waiting with timeout\n",
804                 ntohl (msg->operation),
805                 GNUNET_h2s (&msg->app_id));
806     return GNUNET_OK;
807   }
808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809               "Received P2P operation request (op %u, app %s) for active listener\n",
810               ntohl (msg->operation),
811               GNUNET_h2s (&msg->app_id));
812   incoming_suggest (op, listener);
813   return GNUNET_OK;
814 }
815
816
817 static void
818 execute_add (struct Set *set,
819              const struct GNUNET_MessageHeader *m)
820 {
821   const struct GNUNET_SET_ElementMessage *msg;
822   struct GNUNET_SET_Element el;
823   struct ElementEntry *ee;
824   struct GNUNET_HashCode hash;
825
826   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type));
827
828   msg = (const struct GNUNET_SET_ElementMessage *) m;
829   el.size = ntohs (m->size) - sizeof *msg;
830   el.data = &msg[1];
831   el.element_type = ntohs (msg->element_type);
832   GNUNET_SET_element_hash (&el, &hash);
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Client inserts element %s of size %u\n",
835               GNUNET_h2s (&hash),
836               el.size);
837
838   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
839                                           &hash);
840
841   if (NULL == ee)
842   {
843     ee = GNUNET_malloc (el.size + sizeof *ee);
844     ee->element.size = el.size;
845     memcpy (&ee[1],
846             el.data,
847             el.size);
848     ee->element.data = &ee[1];
849     ee->element.element_type = el.element_type;
850     ee->remote = GNUNET_NO;
851     ee->mutations = NULL;
852     ee->mutations_size = 0;
853     ee->element_hash = hash;
854     GNUNET_break (GNUNET_YES ==
855                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
856                                                      &ee->element_hash,
857                                                      ee,
858                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
859   }
860   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
861   {
862     /* same element inserted twice */
863     return;
864   }
865
866   {
867     struct MutationEvent mut = {
868       .generation = set->current_generation,
869       .added = GNUNET_YES
870     };
871     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
872   }
873
874   set->vt->add (set->state, ee);
875 }
876
877
878 static void
879 execute_remove (struct Set *set,
880                 const struct GNUNET_MessageHeader *m)
881 {
882   const struct GNUNET_SET_ElementMessage *msg;
883   struct GNUNET_SET_Element el;
884   struct ElementEntry *ee;
885   struct GNUNET_HashCode hash;
886
887   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type));
888
889   msg = (const struct GNUNET_SET_ElementMessage *) m;
890   el.size = ntohs (m->size) - sizeof *msg;
891   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892               "Client removes element of size %u\n",
893               el.size);
894   el.data = &msg[1];
895   el.element_type = ntohs (msg->element_type);
896   GNUNET_SET_element_hash (&el, &hash);
897   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
898                                           &hash);
899   if (NULL == ee)
900   {
901     /* Client tried to remove non-existing element. */
902     return;
903   }
904   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
905   {
906     /* Client tried to remove element twice */
907     return;
908   }
909   else
910   {
911     struct MutationEvent mut = {
912       .generation = set->current_generation,
913       .added = GNUNET_NO
914     };
915     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
916   }
917   set->vt->remove (set->state, ee);
918 }
919
920
921
922 static void
923 execute_mutation (struct Set *set,
924                   const struct GNUNET_MessageHeader *m)
925 {
926   switch (ntohs (m->type))
927   {
928     case GNUNET_MESSAGE_TYPE_SET_ADD:
929       execute_add (set, m);
930       break;
931     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
932       execute_remove (set, m);
933       break;
934     default:
935       GNUNET_break (0);
936   }
937 }
938
939
940
941 /**
942  * Send the next element of a set to the set's client.  The next element is given by
943  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
944  * are no more elements in the set.  The caller must ensure that the set's iterator is
945  * valid.
946  *
947  * The client will acknowledge each received element with a
948  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
949  * #handle_client_iter_ack() will then trigger the next transmission.
950  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
951  *
952  * @param set set that should send its next element to its client
953  */
954 static void
955 send_client_element (struct Set *set)
956 {
957   int ret;
958   struct ElementEntry *ee;
959   struct GNUNET_MQ_Envelope *ev;
960   struct GNUNET_SET_IterResponseMessage *msg;
961
962   GNUNET_assert (NULL != set->iter);
963
964 again:
965
966   ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
967                                                      NULL,
968                                                      (const void **) &ee);
969   if (GNUNET_NO == ret)
970   {
971     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972                 "Iteration on %p done.\n",
973                 (void *) set);
974     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
975     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
976     set->iter = NULL;
977     set->iteration_id++;
978     
979     GNUNET_assert (set->content->iterator_count > 0);
980     set->content->iterator_count -= 1;
981
982     if (0 == set->content->iterator_count)
983     {
984       while (NULL != set->content->pending_mutations_head)
985       {
986         struct PendingMutation *pm;
987
988         pm = set->content->pending_mutations_head;
989         GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
990                                      set->content->pending_mutations_tail,
991                                      pm);
992         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993                     "Executing pending mutation on %p.\n",
994                     (void *) pm->set);
995         execute_mutation (pm->set, pm->mutation_message);
996         GNUNET_free (pm->mutation_message);
997         GNUNET_free (pm);
998       }
999     }
1000
1001   }
1002   else
1003   {
1004     GNUNET_assert (NULL != ee);
1005
1006     if (GNUNET_NO == is_element_of_iteration (ee, set))
1007       goto again;
1008
1009     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                 "Sending iteration element on %p.\n",
1011                 (void *) set);
1012     ev = GNUNET_MQ_msg_extra (msg,
1013                               ee->element.size,
1014                               GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1015     memcpy (&msg[1],
1016             ee->element.data,
1017             ee->element.size);
1018     msg->element_type = htons (ee->element.element_type);
1019     msg->iteration_id = htons (set->iteration_id);
1020   }
1021   GNUNET_MQ_send (set->client_mq, ev);
1022 }
1023
1024
1025 /**
1026  * Called when a client wants to iterate the elements of a set.
1027  * Checks if we have a set associated with the client and if we
1028  * can right now start an iteration. If all checks out, starts
1029  * sending the elements of the set to the client.
1030  *
1031  * @param cls unused
1032  * @param client client that sent the message
1033  * @param m message sent by the client
1034  */
1035 static void
1036 handle_client_iterate (void *cls,
1037                        struct GNUNET_SERVER_Client *client,
1038                        const struct GNUNET_MessageHeader *m)
1039 {
1040   struct Set *set;
1041
1042   set = set_get (client);
1043   if (NULL == set)
1044   {
1045     /* attempt to iterate over a non existing set */
1046     GNUNET_break (0);
1047     GNUNET_SERVER_client_disconnect (client);
1048     return;
1049   }
1050   if (NULL != set->iter)
1051   {
1052     /* Only one concurrent iterate-action allowed per set */
1053     GNUNET_break (0);
1054     GNUNET_SERVER_client_disconnect (client);
1055     return;
1056   }
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "Iterating set %p in gen %u with %u content elements\n",
1059               (void *) set,
1060               set->current_generation,
1061               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1062   GNUNET_SERVER_receive_done (client,
1063                               GNUNET_OK);
1064   set->content->iterator_count += 1;
1065   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1066   set->iter_generation = set->current_generation;
1067   send_client_element (set);
1068 }
1069
1070
1071 /**
1072  * Called when a client wants to create a new set.  This is typically
1073  * the first request from a client, and includes the type of set
1074  * operation to be performed.
1075  *
1076  * @param cls unused
1077  * @param client client that sent the message
1078  * @param m message sent by the client
1079  */
1080 static void
1081 handle_client_create_set (void *cls,
1082                           struct GNUNET_SERVER_Client *client,
1083                           const struct GNUNET_MessageHeader *m)
1084 {
1085   const struct GNUNET_SET_CreateMessage *msg;
1086   struct Set *set;
1087
1088   msg = (const struct GNUNET_SET_CreateMessage *) m;
1089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090               "Client created new set (operation %u)\n",
1091               ntohl (msg->operation));
1092   if (NULL != set_get (client))
1093   {
1094     /* There can only be one set per client */
1095     GNUNET_break (0);
1096     GNUNET_SERVER_client_disconnect (client);
1097     return;
1098   }
1099   set = GNUNET_new (struct Set);
1100   switch (ntohl (msg->operation))
1101   {
1102   case GNUNET_SET_OPERATION_INTERSECTION:
1103     set->vt = _GSS_intersection_vt ();
1104     break;
1105   case GNUNET_SET_OPERATION_UNION:
1106     set->vt = _GSS_union_vt ();
1107     break;
1108   default:
1109     GNUNET_free (set);
1110     GNUNET_break (0);
1111     GNUNET_SERVER_client_disconnect (client);
1112     return;
1113   }
1114   set->operation = ntohl (msg->operation);
1115   set->state = set->vt->create ();
1116   set->content = GNUNET_new (struct SetContent);
1117   set->content->refcount = 1;
1118   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1119   set->client = client;
1120   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
1121   GNUNET_CONTAINER_DLL_insert (sets_head,
1122                                sets_tail,
1123                                set);
1124   GNUNET_SERVER_receive_done (client,
1125                               GNUNET_OK);
1126 }
1127
1128
1129 /**
1130  * Called when a client wants to create a new listener.
1131  *
1132  * @param cls unused
1133  * @param client client that sent the message
1134  * @param m message sent by the client
1135  */
1136 static void
1137 handle_client_listen (void *cls,
1138                       struct GNUNET_SERVER_Client *client,
1139                       const struct GNUNET_MessageHeader *m)
1140 {
1141   const struct GNUNET_SET_ListenMessage *msg;
1142   struct Listener *listener;
1143   struct Operation *op;
1144
1145   msg = (const struct GNUNET_SET_ListenMessage *) m;
1146   if (NULL != listener_get (client))
1147   {
1148     /* max. one active listener per client! */
1149     GNUNET_break (0);
1150     GNUNET_SERVER_client_disconnect (client);
1151     return;
1152   }
1153   listener = GNUNET_new (struct Listener);
1154   listener->client = client;
1155   listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
1156   listener->app_id = msg->app_id;
1157   listener->operation = ntohl (msg->operation);
1158   GNUNET_CONTAINER_DLL_insert_tail (listeners_head,
1159                                     listeners_tail,
1160                                     listener);
1161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162               "New listener created (op %u, app %s)\n",
1163               listener->operation,
1164               GNUNET_h2s (&listener->app_id));
1165
1166   /* check for existing incoming requests the listener might be interested in */
1167   for (op = incoming_head; NULL != op; op = op->next)
1168   {
1169     if (NULL == op->spec)
1170       continue; /* no details available yet */
1171     if (0 != op->suggest_id)
1172       continue; /* this one has been already suggested to a listener */
1173     if (listener->operation != op->spec->operation)
1174       continue; /* incompatible operation */
1175     if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1176                                      &op->spec->app_id))
1177       continue; /* incompatible appliation */
1178     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1179                 "Found matching existing request\n");
1180     incoming_suggest (op,
1181                       listener);
1182   }
1183   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1184 }
1185
1186
1187 /**
1188  * Called when the listening client rejects an operation
1189  * request by another peer.
1190  *
1191  * @param cls unused
1192  * @param client client that sent the message
1193  * @param m message sent by the client
1194  */
1195 static void
1196 handle_client_reject (void *cls,
1197                       struct GNUNET_SERVER_Client *client,
1198                       const struct GNUNET_MessageHeader *m)
1199 {
1200   struct Operation *incoming;
1201   const struct GNUNET_SET_RejectMessage *msg;
1202
1203   msg = (const struct GNUNET_SET_RejectMessage *) m;
1204   incoming = get_incoming (ntohl (msg->accept_reject_id));
1205   if (NULL == incoming)
1206   {
1207     /* no matching incoming operation for this reject */
1208     GNUNET_break (0);
1209     GNUNET_SERVER_receive_done (client,
1210                                 GNUNET_SYSERR);
1211     return;
1212   }
1213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214               "Peer request (op %u, app %s) rejected by client\n",
1215               incoming->spec->operation,
1216               GNUNET_h2s (&incoming->spec->app_id));
1217   GNUNET_CADET_channel_destroy (incoming->channel);
1218   GNUNET_SERVER_receive_done (client,
1219                               GNUNET_OK);
1220 }
1221
1222
1223
1224 /**
1225  * Called when a client wants to add or remove an element to a set it inhabits.
1226  *
1227  * @param cls unused
1228  * @param client client that sent the message
1229  * @param m message sent by the client
1230  */
1231 static void
1232 handle_client_mutation (void *cls,
1233                         struct GNUNET_SERVER_Client *client,
1234                         const struct GNUNET_MessageHeader *m)
1235 {
1236   struct Set *set;
1237
1238   set = set_get (client);
1239   if (NULL == set)
1240   {
1241     /* client without a set requested an operation */
1242     GNUNET_break (0);
1243     GNUNET_SERVER_client_disconnect (client);
1244     return;
1245   }
1246
1247   GNUNET_SERVER_receive_done (client,
1248                               GNUNET_OK);
1249
1250   if (0 != set->content->iterator_count)
1251   {
1252     struct PendingMutation *pm;
1253
1254     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255                 "Scheduling mutation on set\n");
1256
1257     pm = GNUNET_new (struct PendingMutation);
1258     pm->mutation_message = GNUNET_copy_message (m);
1259     pm->set = set;
1260     GNUNET_CONTAINER_DLL_insert (set->content->pending_mutations_head,
1261                                  set->content->pending_mutations_tail,
1262                                  pm);
1263     return;
1264   }
1265
1266   execute_mutation (set, m);
1267 }
1268
1269
1270 /**
1271  * Advance the current generation of a set,
1272  * adding exclusion ranges if necessary.
1273  *
1274  * @param set the set where we want to advance the generation
1275  */
1276 static void
1277 advance_generation (struct Set *set)
1278 {
1279   struct GenerationRange r;
1280
1281   if (set->current_generation == set->content->latest_generation)
1282   {
1283     set->content->latest_generation += 1;
1284     set->current_generation += 1;
1285     return;
1286   }
1287
1288   GNUNET_assert (set->current_generation < set->content->latest_generation);
1289
1290   r.start = set->current_generation + 1;
1291   r.end = set->content->latest_generation + 1;
1292
1293   set->content->latest_generation = r.end;
1294   set->current_generation = r.end;
1295
1296   GNUNET_array_append (set->excluded_generations,
1297                        set->excluded_generations_size,
1298                        r);
1299 }
1300
1301 /**
1302  * Called when a client wants to initiate a set operation with another
1303  * peer.  Initiates the CADET connection to the listener and sends the
1304  * request.
1305  *
1306  * @param cls unused
1307  * @param client client that sent the message
1308  * @param m message sent by the client
1309  */
1310 static void
1311 handle_client_evaluate (void *cls,
1312                         struct GNUNET_SERVER_Client *client,
1313                         const struct GNUNET_MessageHeader *m)
1314 {
1315   struct Set *set;
1316   const struct GNUNET_SET_EvaluateMessage *msg;
1317   struct OperationSpecification *spec;
1318   struct Operation *op;
1319   const struct GNUNET_MessageHeader *context;
1320
1321   set = set_get (client);
1322   if (NULL == set)
1323   {
1324     GNUNET_break (0);
1325     GNUNET_SERVER_client_disconnect (client);
1326     return;
1327   }
1328   msg = (const struct GNUNET_SET_EvaluateMessage *) m;
1329   spec = GNUNET_new (struct OperationSpecification);
1330   spec->operation = set->operation;
1331   spec->app_id = msg->app_id;
1332   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1333                                          UINT32_MAX);
1334   spec->peer = msg->target_peer;
1335   spec->set = set;
1336   spec->result_mode = ntohl (msg->result_mode);
1337   spec->client_request_id = ntohl (msg->request_id);
1338   context = GNUNET_MQ_extract_nested_mh (msg);
1339   op = GNUNET_new (struct Operation);
1340   op->spec = spec;
1341
1342   // Advance generation values, so that
1343   // mutations won't interfer with the running operation.
1344   op->generation_created = set->current_generation;
1345   advance_generation (set);
1346
1347   op->vt = set->vt;
1348   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1349                                set->ops_tail,
1350                                op);
1351   op->channel = GNUNET_CADET_channel_create (cadet,
1352                                              op,
1353                                              &msg->target_peer,
1354                                              GNUNET_APPLICATION_TYPE_SET,
1355                                              GNUNET_CADET_OPTION_RELIABLE);
1356   op->mq = GNUNET_CADET_mq_create (op->channel);
1357   set->vt->evaluate (op,
1358                      context);
1359   GNUNET_SERVER_receive_done (client,
1360                               GNUNET_OK);
1361 }
1362
1363
1364 /**
1365  * Handle an ack from a client, and send the next element. Note
1366  * that we only expect acks for set elements, not after the
1367  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1368  *
1369  * @param cls unused
1370  * @param client the client
1371  * @param m the message
1372  */
1373 static void
1374 handle_client_iter_ack (void *cls,
1375                         struct GNUNET_SERVER_Client *client,
1376                         const struct GNUNET_MessageHeader *m)
1377 {
1378   const struct GNUNET_SET_IterAckMessage *ack;
1379   struct Set *set;
1380
1381   set = set_get (client);
1382   if (NULL == set)
1383   {
1384     /* client without a set acknowledged receiving a value */
1385     GNUNET_break (0);
1386     GNUNET_SERVER_client_disconnect (client);
1387     return;
1388   }
1389   if (NULL == set->iter)
1390   {
1391     /* client sent an ack, but we were not expecting one (as
1392        set iteration has finished) */
1393     GNUNET_break (0);
1394     GNUNET_SERVER_client_disconnect (client);
1395     return;
1396   }
1397   ack = (const struct GNUNET_SET_IterAckMessage *) m;
1398   GNUNET_SERVER_receive_done (client,
1399                               GNUNET_OK);
1400   if (ntohl (ack->send_more))
1401   {
1402     send_client_element (set);
1403   }
1404   else
1405   {
1406     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1407     set->iter = NULL;
1408     set->iteration_id++;
1409   }
1410 }
1411
1412
1413 /**
1414  * Handle a request from the client to
1415  * copy a set.
1416  *
1417  * @param cls unused
1418  * @param client the client
1419  * @param mh the message
1420  */
1421 static void
1422 handle_client_copy_lazy_prepare (void *cls,
1423                                  struct GNUNET_SERVER_Client *client,
1424                                  const struct GNUNET_MessageHeader *mh)
1425 {
1426   struct Set *set;
1427   struct LazyCopyRequest *cr;
1428   struct GNUNET_MQ_Envelope *ev;
1429   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1430
1431   set = set_get (client);
1432   if (NULL == set)
1433   {
1434     /* client without a set requested an operation */
1435     GNUNET_break (0);
1436     GNUNET_SERVER_client_disconnect (client);
1437     return;
1438   }
1439
1440   cr = GNUNET_new (struct LazyCopyRequest);
1441
1442   cr->cookie = lazy_copy_cookie;
1443   lazy_copy_cookie += 1;
1444   cr->source_set = set;
1445
1446   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1447                                lazy_copy_tail,
1448                                cr);
1449
1450
1451   ev = GNUNET_MQ_msg (resp_msg,
1452                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1453   resp_msg->cookie = cr->cookie;
1454   GNUNET_MQ_send (set->client_mq, ev);
1455
1456
1457   GNUNET_SERVER_receive_done (client,
1458                               GNUNET_OK);
1459
1460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461               "Client requested lazy copy\n");
1462 }
1463
1464
1465 /**
1466  * Handle a request from the client to
1467  * connect to a copy of a set.
1468  *
1469  * @param cls unused
1470  * @param client the client
1471  * @param mh the message
1472  */
1473 static void
1474 handle_client_copy_lazy_connect (void *cls,
1475                                  struct GNUNET_SERVER_Client *client,
1476                                  const struct GNUNET_MessageHeader *mh)
1477 {
1478   struct LazyCopyRequest *cr;
1479   const struct GNUNET_SET_CopyLazyConnectMessage *msg =
1480       (const struct GNUNET_SET_CopyLazyConnectMessage *) mh;
1481   struct Set *set;
1482   int found;
1483
1484   if (NULL != set_get (client))
1485   {
1486     /* There can only be one set per client */
1487     GNUNET_break (0);
1488     GNUNET_SERVER_client_disconnect (client);
1489     return;
1490   }
1491
1492   found = GNUNET_NO;
1493
1494   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1495   {
1496     if (cr->cookie == msg->cookie)
1497     {
1498       found = GNUNET_YES;
1499       break;
1500     } 
1501   }
1502
1503   if (GNUNET_NO == found)
1504   {
1505     /* client asked for copy with cookie we don't know */
1506     GNUNET_break (0);
1507     GNUNET_SERVER_client_disconnect (client);
1508     return;
1509   }
1510
1511   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1512                                lazy_copy_tail,
1513                                cr);
1514
1515   set = GNUNET_new (struct Set);
1516
1517   switch (cr->source_set->operation)
1518   {
1519   case GNUNET_SET_OPERATION_INTERSECTION:
1520     set->vt = _GSS_intersection_vt ();
1521     break;
1522   case GNUNET_SET_OPERATION_UNION:
1523     set->vt = _GSS_union_vt ();
1524     break;
1525   default:
1526     GNUNET_assert (0);
1527     return;
1528   }
1529
1530   if (NULL == set->vt->copy_state) {
1531     /* Lazy copy not supported for this set operation */
1532     GNUNET_break (0);
1533     GNUNET_free (set);
1534     GNUNET_free (cr);
1535     GNUNET_SERVER_client_disconnect (client);
1536     return;
1537   }
1538
1539   set->operation = cr->source_set->operation;
1540   set->state = set->vt->copy_state (cr->source_set);
1541   set->content = cr->source_set->content;
1542   set->content->refcount += 1;
1543
1544   set->current_generation = cr->source_set->current_generation;
1545   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1546   set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations,
1547                                              set->excluded_generations_size * sizeof (struct GenerationRange));
1548
1549   /* Advance the generation of the new set, so that mutations to the
1550      of the cloned set and the source set are independent. */
1551   advance_generation (set);
1552
1553
1554   set->client = client;
1555   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
1556   GNUNET_CONTAINER_DLL_insert (sets_head,
1557                                sets_tail,
1558                                set);
1559
1560   GNUNET_free (cr);
1561
1562   GNUNET_SERVER_receive_done (client,
1563                               GNUNET_OK);
1564
1565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1566               "Client connected to lazy set\n");
1567 }
1568
1569
1570 /**
1571  * Handle a request from the client to
1572  * cancel a running set operation.
1573  *
1574  * @param cls unused
1575  * @param client the client
1576  * @param mh the message
1577  */
1578 static void
1579 handle_client_cancel (void *cls,
1580                       struct GNUNET_SERVER_Client *client,
1581                       const struct GNUNET_MessageHeader *mh)
1582 {
1583   const struct GNUNET_SET_CancelMessage *msg =
1584       (const struct GNUNET_SET_CancelMessage *) mh;
1585   struct Set *set;
1586   struct Operation *op;
1587   int found;
1588
1589   set = set_get (client);
1590   if (NULL == set)
1591   {
1592     /* client without a set requested an operation */
1593     GNUNET_break (0);
1594     GNUNET_SERVER_client_disconnect (client);
1595     return;
1596   }
1597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1598               "Client requested cancel for op %u\n",
1599               ntohl (msg->request_id));
1600   found = GNUNET_NO;
1601   for (op = set->ops_head; NULL != op; op = op->next)
1602   {
1603     if (op->spec->client_request_id == ntohl (msg->request_id))
1604     {
1605       found = GNUNET_YES;
1606       break;
1607     }
1608   }
1609   if (GNUNET_NO == found)
1610   {
1611     /* It may happen that the operation was already destroyed due to
1612      * the other peer disconnecting.  The client may not know about this
1613      * yet and try to cancel the (just barely non-existent) operation.
1614      * So this is not a hard error.
1615      */
1616     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617                 "Client canceled non-existent op\n");
1618   }
1619   else
1620   {
1621     _GSS_operation_destroy (op,
1622                             GNUNET_YES);
1623   }
1624   GNUNET_SERVER_receive_done (client,
1625                               GNUNET_OK);
1626 }
1627
1628
1629 /**
1630  * Handle a request from the client to accept a set operation that
1631  * came from a remote peer.  We forward the accept to the associated
1632  * operation for handling
1633  *
1634  * @param cls unused
1635  * @param client the client
1636  * @param mh the message
1637  */
1638 static void
1639 handle_client_accept (void *cls,
1640                       struct GNUNET_SERVER_Client *client,
1641                       const struct GNUNET_MessageHeader *mh)
1642 {
1643   struct Set *set;
1644   const struct GNUNET_SET_AcceptMessage *msg;
1645   struct Operation *op;
1646   struct GNUNET_SET_ResultMessage *result_message;
1647   struct GNUNET_MQ_Envelope *ev;
1648
1649   msg = (const struct GNUNET_SET_AcceptMessage *) mh;
1650   set = set_get (client);
1651   if (NULL == set)
1652   {
1653     /* client without a set requested to accept */
1654     GNUNET_break (0);
1655     GNUNET_SERVER_client_disconnect (client);
1656     return;
1657   }
1658   op = get_incoming (ntohl (msg->accept_reject_id));
1659   if (NULL == op)
1660   {
1661     /* It is not an error if the set op does not exist -- it may
1662      * have been destroyed when the partner peer disconnected. */
1663     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1664                 "Client accepted request that is no longer active\n");
1665     ev = GNUNET_MQ_msg (result_message,
1666                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1667     result_message->request_id = msg->request_id;
1668     result_message->element_type = 0;
1669     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1670     GNUNET_MQ_send (set->client_mq, ev);
1671     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1672     return;
1673   }
1674
1675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1676               "Client accepting request %u\n",
1677               ntohl (msg->accept_reject_id));
1678   GNUNET_assert (GNUNET_YES == op->is_incoming);
1679   op->is_incoming = GNUNET_NO;
1680   GNUNET_CONTAINER_DLL_remove (incoming_head,
1681                                incoming_tail,
1682                                op);
1683   op->spec->set = set;
1684   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1685                                set->ops_tail,
1686                                op);
1687   op->spec->client_request_id = ntohl (msg->request_id);
1688   op->spec->result_mode = ntohl (msg->result_mode);
1689
1690   // Advance generation values, so that
1691   // mutations won't interfer with the running operation.
1692   op->generation_created = set->current_generation;
1693   advance_generation (set);
1694
1695   op->vt = set->vt;
1696   op->vt->accept (op);
1697   GNUNET_SERVER_receive_done (client,
1698                               GNUNET_OK);
1699 }
1700
1701
1702 /**
1703  * Called to clean up, after a shutdown has been requested.
1704  *
1705  * @param cls closure
1706  * @param tc context information (why was this task triggered now)
1707  */
1708 static void
1709 shutdown_task (void *cls,
1710                const struct GNUNET_SCHEDULER_TaskContext *tc)
1711 {
1712   while (NULL != incoming_head)
1713     incoming_destroy (incoming_head);
1714   while (NULL != listeners_head)
1715     listener_destroy (listeners_head);
1716   while (NULL != sets_head)
1717     set_destroy (sets_head);
1718
1719   /* it's important to destroy cadet at the end, as all channels
1720    * must be destroyed before the cadet handle! */
1721   if (NULL != cadet)
1722   {
1723     GNUNET_CADET_disconnect (cadet);
1724     cadet = NULL;
1725   }
1726   GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
1727   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1728               "handled shutdown request\n");
1729 }
1730
1731
1732 /**
1733  * Timeout happens iff:
1734  *  - we suggested an operation to our listener,
1735  *    but did not receive a response in time
1736  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1737  *  - shutdown (obviously)
1738  *
1739  * @param cls channel context
1740  * @param tc context information (why was this task triggered now)
1741  */
1742 static void
1743 incoming_timeout_cb (void *cls,
1744                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1745 {
1746   struct Operation *incoming = cls;
1747
1748   incoming->timeout_task = NULL;
1749   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1750   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1751     return;
1752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1753               "Remote peer's incoming request timed out\n");
1754   incoming_destroy (incoming);
1755 }
1756
1757
1758 /**
1759  * Terminates an incoming operation in case we have not yet received an
1760  * operation request. Called by the channel destruction handler.
1761  *
1762  * @param op the channel context
1763  */
1764 static void
1765 handle_incoming_disconnect (struct Operation *op)
1766 {
1767   GNUNET_assert (GNUNET_YES == op->is_incoming);
1768   /* channel is already dead, incoming_destroy must not
1769    * destroy it ... */
1770   op->channel = NULL;
1771   incoming_destroy (op);
1772   op->vt = NULL;
1773 }
1774
1775
1776 /**
1777  * Method called whenever another peer has added us to a channel the
1778  * other peer initiated.  Only called (once) upon reception of data
1779  * with a message type which was subscribed to in
1780  * GNUNET_CADET_connect().
1781  *
1782  * The channel context represents the operation itself and gets added to a DLL,
1783  * from where it gets looked up when our local listener client responds
1784  * to a proposed/suggested operation or connects and associates with this operation.
1785  *
1786  * @param cls closure
1787  * @param channel new handle to the channel
1788  * @param initiator peer that started the channel
1789  * @param port Port this channel is for.
1790  * @param options Unused.
1791  * @return initial channel context for the channel
1792  *         returns NULL on error
1793  */
1794 static void *
1795 channel_new_cb (void *cls,
1796                 struct GNUNET_CADET_Channel *channel,
1797                 const struct GNUNET_PeerIdentity *initiator,
1798                 uint32_t port,
1799                 enum GNUNET_CADET_ChannelOption options)
1800 {
1801   static const struct SetVT incoming_vt = {
1802     .msg_handler = &handle_incoming_msg,
1803     .peer_disconnect = &handle_incoming_disconnect
1804   };
1805   struct Operation *incoming;
1806
1807   if (GNUNET_APPLICATION_TYPE_SET != port)
1808   {
1809     GNUNET_break (0);
1810     GNUNET_CADET_channel_destroy (channel);
1811     return NULL;
1812   }
1813   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1814               "New incoming channel\n");
1815   incoming = GNUNET_new (struct Operation);
1816   incoming->is_incoming = GNUNET_YES;
1817   incoming->peer = *initiator;
1818   incoming->channel = channel;
1819   incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
1820   incoming->vt = &incoming_vt;
1821   incoming->timeout_task
1822     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1823                                     &incoming_timeout_cb,
1824                                     incoming);
1825   GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
1826                                     incoming_tail,
1827                                     incoming);
1828   return incoming;
1829 }
1830
1831
1832 /**
1833  * Function called whenever a channel is destroyed.  Should clean up
1834  * any associated state.  It must NOT call
1835  * GNUNET_CADET_channel_destroy() on the channel.
1836  *
1837  * The peer_disconnect function is part of a a virtual table set initially either
1838  * when a peer creates a new channel with us (#channel_new_cb()), or once we create
1839  * a new channel ourselves (evaluate).
1840  *
1841  * Once we know the exact type of operation (union/intersection), the vt is
1842  * replaced with an operation specific instance (_GSS_[op]_vt).
1843  *
1844  * @param cls closure (set from GNUNET_CADET_connect())
1845  * @param channel connection to the other end (henceforth invalid)
1846  * @param channel_ctx place where local state associated
1847  *                   with the channel is stored
1848  */
1849 static void
1850 channel_end_cb (void *cls,
1851                 const struct GNUNET_CADET_Channel *channel,
1852                 void *channel_ctx)
1853 {
1854   struct Operation *op = channel_ctx;
1855
1856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1857               "channel_end_cb called\n");
1858   op->channel = NULL;
1859   op->keep++;
1860   /* the vt can be null if a client already requested canceling op. */
1861   if (NULL != op->vt)
1862   {
1863     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1864                 "calling peer disconnect due to channel end\n");
1865     op->vt->peer_disconnect (op);
1866   }
1867   op->keep--;
1868   if (0 == op->keep)
1869   {
1870     /* cadet will never call us with the context again! */
1871     GNUNET_free (op);
1872   }
1873   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1874               "channel_end_cb finished\n");
1875 }
1876
1877
1878 /**
1879  * Functions with this signature are called whenever a message is
1880  * received via a cadet channel.
1881  *
1882  * The msg_handler is a virtual table set in initially either when a peer
1883  * creates a new channel with us (channel_new_cb), or once we create a new channel
1884  * ourselves (evaluate).
1885  *
1886  * Once we know the exact type of operation (union/intersection), the vt is
1887  * replaced with an operation specific instance (_GSS_[op]_vt).
1888  *
1889  * @param cls Closure (set from GNUNET_CADET_connect()).
1890  * @param channel Connection to the other end.
1891  * @param channel_ctx Place to store local state associated with the channel.
1892  * @param message The actual message.
1893  * @return #GNUNET_OK to keep the channel open,
1894  *         #GNUNET_SYSERR to close it (signal serious error).
1895  */
1896 static int
1897 dispatch_p2p_message (void *cls,
1898                       struct GNUNET_CADET_Channel *channel,
1899                       void **channel_ctx,
1900                       const struct GNUNET_MessageHeader *message)
1901 {
1902   struct Operation *op = *channel_ctx;
1903   int ret;
1904
1905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906               "Dispatching cadet message (type: %u)\n",
1907               ntohs (message->type));
1908   /* do this before the handler, as the handler might kill the channel */
1909   GNUNET_CADET_receive_done (channel);
1910   if (NULL != op->vt)
1911     ret = op->vt->msg_handler (op,
1912                                message);
1913   else
1914     ret = GNUNET_SYSERR;
1915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1916               "Handled cadet message (type: %u)\n",
1917               ntohs (message->type));
1918   return ret;
1919 }
1920
1921
1922 /**
1923  * Function called by the service's run
1924  * method to run service-specific setup code.
1925  *
1926  * @param cls closure
1927  * @param server the initialized server
1928  * @param cfg configuration to use
1929  */
1930 static void
1931 run (void *cls,
1932      struct GNUNET_SERVER_Handle *server,
1933      const struct GNUNET_CONFIGURATION_Handle *cfg)
1934 {
1935   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1936     { &handle_client_accept, NULL,
1937       GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1938       sizeof (struct GNUNET_SET_AcceptMessage)},
1939     { &handle_client_iter_ack, NULL,
1940       GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1941       sizeof (struct GNUNET_SET_IterAckMessage) },
1942     { &handle_client_mutation, NULL,
1943       GNUNET_MESSAGE_TYPE_SET_ADD,
1944       0},
1945     { &handle_client_create_set, NULL,
1946       GNUNET_MESSAGE_TYPE_SET_CREATE,
1947       sizeof (struct GNUNET_SET_CreateMessage)},
1948     { &handle_client_iterate, NULL,
1949       GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1950       sizeof (struct GNUNET_MessageHeader)},
1951     { &handle_client_evaluate, NULL,
1952       GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1953       0},
1954     { &handle_client_listen, NULL,
1955       GNUNET_MESSAGE_TYPE_SET_LISTEN,
1956       sizeof (struct GNUNET_SET_ListenMessage)},
1957     { &handle_client_reject, NULL,
1958       GNUNET_MESSAGE_TYPE_SET_REJECT,
1959       sizeof (struct GNUNET_SET_RejectMessage)},
1960     { &handle_client_mutation, NULL,
1961       GNUNET_MESSAGE_TYPE_SET_REMOVE,
1962       0},
1963     { &handle_client_cancel, NULL,
1964       GNUNET_MESSAGE_TYPE_SET_CANCEL,
1965       sizeof (struct GNUNET_SET_CancelMessage)},
1966     { &handle_client_copy_lazy_prepare, NULL,
1967       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1968       sizeof (struct GNUNET_MessageHeader)},
1969     { &handle_client_copy_lazy_connect, NULL,
1970       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1971       sizeof (struct GNUNET_SET_CopyLazyConnectMessage)},
1972     { NULL, NULL, 0, 0}
1973   };
1974   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1975     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1976     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1977     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1978     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0},
1979     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0},
1980     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0},
1981     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1982     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
1983     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1984     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
1985     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
1986     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0},
1987     {NULL, 0, 0}
1988   };
1989   static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
1990
1991   configuration = cfg;
1992   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1993                                 &shutdown_task, NULL);
1994   GNUNET_SERVER_disconnect_notify (server,
1995                                    &handle_client_disconnect, NULL);
1996   GNUNET_SERVER_add_handlers (server,
1997                               server_handlers);
1998   _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
1999   cadet = GNUNET_CADET_connect (cfg, NULL,
2000                                 &channel_new_cb,
2001                                 &channel_end_cb,
2002                                 cadet_handlers,
2003                                 cadet_ports);
2004   if (NULL == cadet)
2005   {
2006     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2007                 _("Could not connect to cadet service\n"));
2008     return;
2009   }
2010 }
2011
2012
2013 /**
2014  * The main function for the set service.
2015  *
2016  * @param argc number of arguments from the command line
2017  * @param argv command line arguments
2018  * @return 0 ok, 1 on error
2019  */
2020 int
2021 main (int argc,
2022       char *const *argv)
2023 {
2024   int ret;
2025
2026   ret = GNUNET_SERVICE_run (argc, argv, "set",
2027                             GNUNET_SERVICE_OPTION_NONE,
2028                             &run, NULL);
2029   return (GNUNET_OK == ret) ? 0 : 1;
2030 }
2031
2032 /* end of gnunet-service-set.c */