log: add \n
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013, 2014 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
29
30 /**
31  * How long do we hold on to an incoming channel if there is
32  * no local listener before giving up?
33  */
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35
36 /**
37  * A listener is inhabited by a client, and waits for evaluation
38  * requests from remote peers.
39  */
40 struct Listener
41 {
42   /**
43    * Listeners are held in a doubly linked list.
44    */
45   struct Listener *next;
46
47   /**
48    * Listeners are held in a doubly linked list.
49    */
50   struct Listener *prev;
51
52   /**
53    * Client that owns the listener.
54    * Only one client may own a listener.
55    */
56   struct GNUNET_SERVER_Client *client;
57
58   /**
59    * Message queue for the client
60    */
61   struct GNUNET_MQ_Handle *client_mq;
62
63   /**
64    * Application ID for the operation, used to distinguish
65    * multiple operations of the same type with the same peer.
66    */
67   struct GNUNET_HashCode app_id;
68
69   /**
70    * The port we are listening on with CADET.
71    */
72   struct GNUNET_CADET_Port *open_port;
73
74   /**
75    * The type of the operation.
76    */
77   enum GNUNET_SET_OperationType operation;
78 };
79
80
81 struct LazyCopyRequest
82 {
83   struct Set *source_set;
84   uint32_t cookie;
85
86   struct LazyCopyRequest *prev;
87   struct LazyCopyRequest *next;
88 };
89
90
91 /**
92  * Configuration of our local peer.
93  */
94 static const struct GNUNET_CONFIGURATION_Handle *configuration;
95
96 /**
97  * Handle to the cadet service, used to listen for and connect to
98  * remote peers.
99  */
100 static struct GNUNET_CADET_Handle *cadet;
101
102 /**
103  * Sets are held in a doubly linked list.
104  */
105 static struct Set *sets_head;
106
107 /**
108  * Sets are held in a doubly linked list.
109  */
110 static struct Set *sets_tail;
111
112 /**
113  * Listeners are held in a doubly linked list.
114  */
115 static struct Listener *listeners_head;
116
117 /**
118  * Listeners are held in a doubly linked list.
119  */
120 static struct Listener *listeners_tail;
121
122 /**
123  * Incoming sockets from remote peers are held in a doubly linked
124  * list.
125  */
126 static struct Operation *incoming_head;
127
128 /**
129  * Incoming sockets from remote peers are held in a doubly linked
130  * list.
131  */
132 static struct Operation *incoming_tail;
133
134 static struct LazyCopyRequest *lazy_copy_head;
135 static struct LazyCopyRequest *lazy_copy_tail;
136
137 static uint32_t lazy_copy_cookie = 1;
138
139 /**
140  * Counter for allocating unique IDs for clients, used to identify
141  * incoming operation requests from remote peers, that the client can
142  * choose to accept or refuse.
143  */
144 static uint32_t suggest_id = 1;
145
146 /**
147  * Statistics handle.
148  */
149 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
150
151
152 /**
153  * Get set that is owned by the given client, if any.
154  *
155  * @param client client to look for
156  * @return set that the client owns, NULL if the client
157  *         does not own a set
158  */
159 static struct Set *
160 set_get (struct GNUNET_SERVER_Client *client)
161 {
162   struct Set *set;
163
164   for (set = sets_head; NULL != set; set = set->next)
165     if (set->client == client)
166       return set;
167   return NULL;
168 }
169
170
171 /**
172  * Get the listener associated with the given client, if any.
173  *
174  * @param client the client
175  * @return listener associated with the client, NULL
176  *         if there isn't any
177  */
178 static struct Listener *
179 listener_get (struct GNUNET_SERVER_Client *client)
180 {
181   struct Listener *listener;
182
183   for (listener = listeners_head; NULL != listener; listener = listener->next)
184     if (listener->client == client)
185       return listener;
186   return NULL;
187 }
188
189
190 /**
191  * Get the incoming socket associated with the given id.
192  *
193  * @param id id to look for
194  * @return the incoming socket associated with the id,
195  *         or NULL if there is none
196  */
197 static struct Operation *
198 get_incoming (uint32_t id)
199 {
200   struct Operation *op;
201
202   for (op = incoming_head; NULL != op; op = op->next)
203     if (op->suggest_id == id)
204     {
205       GNUNET_assert (GNUNET_YES == op->is_incoming);
206       return op;
207     }
208   return NULL;
209 }
210
211
212 /**
213  * Destroy a listener, free all resources associated with it.
214  *
215  * @param listener listener to destroy
216  */
217 static void
218 listener_destroy (struct Listener *listener)
219 {
220   /* If the client is not dead yet, destroy it.
221    * The client's destroy callback will destroy the listener again. */
222   if (NULL != listener->client)
223   {
224     struct GNUNET_SERVER_Client *client = listener->client;
225
226     listener->client = NULL;
227     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228                 "Disconnecting listener client\n");
229     GNUNET_SERVER_client_disconnect (client);
230     return;
231   }
232   if (NULL != listener->client_mq)
233   {
234     GNUNET_MQ_destroy (listener->client_mq);
235     listener->client_mq = NULL;
236   }
237   GNUNET_CADET_close_port (listener->open_port);
238   GNUNET_CONTAINER_DLL_remove (listeners_head,
239                                listeners_tail,
240                                listener);
241   GNUNET_free (listener);
242 }
243
244
245 /**
246  * Context for the #garbage_collect_cb().
247  */
248 struct GarbageContext
249 {
250
251   /**
252    * Map for which we are garbage collecting removed elements.
253    */
254   struct GNUNET_CONTAINER_MultiHashMap *map;
255
256   /**
257    * Lowest generation for which an operation is still pending.
258    */
259   unsigned int min_op_generation;
260
261   /**
262    * Largest generation for which an operation is still pending.
263    */
264   unsigned int max_op_generation;
265
266 };
267
268
269 /**
270  * Function invoked to check if an element can be removed from
271  * the set's history because it is no longer needed.
272  *
273  * @param cls the `struct GarbageContext *`
274  * @param key key of the element in the map
275  * @param value the `struct ElementEntry *`
276  * @return #GNUNET_OK (continue to iterate)
277  */
278 static int
279 garbage_collect_cb (void *cls,
280                     const struct GNUNET_HashCode *key,
281                     void *value)
282 {
283   //struct GarbageContext *gc = cls;
284   //struct ElementEntry *ee = value;
285
286   //if (GNUNET_YES != ee->removed)
287   //  return GNUNET_OK;
288   //if ( (gc->max_op_generation < ee->generation_added) ||
289   //     (ee->generation_removed > gc->min_op_generation) )
290   //{
291   //  GNUNET_assert (GNUNET_YES ==
292   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
293   //                                                       key,
294   //                                                       ee));
295   //  GNUNET_free (ee);
296   //}
297   return GNUNET_OK;
298 }
299
300
301 /**
302  * Collect and destroy elements that are not needed anymore, because
303  * their lifetime (as determined by their generation) does not overlap
304  * with any active set operation.
305  *
306  * @param set set to garbage collect
307  */
308 static void
309 collect_generation_garbage (struct Set *set)
310 {
311   struct Operation *op;
312   struct GarbageContext gc;
313
314   gc.min_op_generation = UINT_MAX;
315   gc.max_op_generation = 0;
316   for (op = set->ops_head; NULL != op; op = op->next)
317   {
318     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
319                                        op->generation_created);
320     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
321                                        op->generation_created);
322   }
323   gc.map = set->content->elements;
324   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
325                                          &garbage_collect_cb,
326                                          &gc);
327 }
328
329
330 static int
331 is_excluded_generation (unsigned int generation,
332                         struct GenerationRange *excluded,
333                         unsigned int excluded_size)
334 {
335   unsigned int i;
336
337   for (i = 0; i < excluded_size; i++)
338   {
339     if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
340       return GNUNET_YES;
341   }
342
343   return GNUNET_NO;
344 }
345
346
347 static int
348 is_element_of_generation (struct ElementEntry *ee,
349                           unsigned int query_generation,
350                           struct GenerationRange *excluded,
351                           unsigned int excluded_size)
352 {
353   struct MutationEvent *mut;
354   int is_present;
355   unsigned int i;
356
357   GNUNET_assert (NULL != ee->mutations);
358
359   if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size))
360   {
361     GNUNET_break (0);
362     return GNUNET_NO;
363   }
364
365   is_present = GNUNET_NO;
366
367   /* Could be made faster with binary search, but lists
368      are small, so why bother. */
369   for (i = 0; i < ee->mutations_size; i++)
370   {
371     mut = &ee->mutations[i];
372
373     if (mut->generation > query_generation)
374     {
375       /* The mutation doesn't apply to our generation
376          anymore.  We can'b break here, since mutations aren't
377          sorted by generation. */
378       continue;
379     }
380
381     if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size))
382     {
383       /* The generation is excluded (because it belongs to another
384          fork via a lazy copy) and thus mutations aren't considered
385          for membership testing. */
386       continue;
387     }
388
389     /* This would be an inconsistency in how we manage mutations. */
390     if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) )
391       GNUNET_assert (0);
392
393     /* Likewise. */
394     if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) )
395       GNUNET_assert (0);
396
397     is_present = mut->added;
398   }
399
400   return is_present;
401 }
402
403
404 int
405 _GSS_is_element_of_set (struct ElementEntry *ee,
406                         struct Set *set)
407 {
408   return is_element_of_generation (ee,
409                                    set->current_generation,
410                                    set->excluded_generations,
411                                    set->excluded_generations_size);
412 }
413
414
415 static int
416 is_element_of_iteration (struct ElementEntry *ee,
417                          struct Set *set)
418 {
419   return is_element_of_generation (ee,
420                                    set->iter_generation,
421                                    set->excluded_generations,
422                                    set->excluded_generations_size);
423 }
424
425
426 int
427 _GSS_is_element_of_operation (struct ElementEntry *ee,
428                               struct Operation *op)
429 {
430   return is_element_of_generation (ee,
431                                    op->generation_created,
432                                    op->spec->set->excluded_generations,
433                                    op->spec->set->excluded_generations_size);
434 }
435
436
437 /**
438  * Destroy the given operation.  Call the implementation-specific
439  * cancel function of the operation.  Disconnects from the remote
440  * peer.  Does not disconnect the client, as there may be multiple
441  * operations per set.
442  *
443  * @param op operation to destroy
444  * @param gc #GNUNET_YES to perform garbage collection on the set
445  */
446 void
447 _GSS_operation_destroy (struct Operation *op,
448                         int gc)
449 {
450   struct Set *set;
451   struct GNUNET_CADET_Channel *channel;
452
453   if (NULL == op->vt)
454   {
455     /* already in #_GSS_operation_destroy() */
456     return;
457   }
458   GNUNET_assert (GNUNET_NO == op->is_incoming);
459   GNUNET_assert (NULL != op->spec);
460   set = op->spec->set;
461   GNUNET_CONTAINER_DLL_remove (set->ops_head,
462                                set->ops_tail,
463                                op);
464   op->vt->cancel (op);
465   op->vt = NULL;
466   if (NULL != op->spec)
467   {
468     if (NULL != op->spec->context_msg)
469     {
470       GNUNET_free (op->spec->context_msg);
471       op->spec->context_msg = NULL;
472     }
473     GNUNET_free (op->spec);
474     op->spec = NULL;
475   }
476   if (NULL != op->mq)
477   {
478     GNUNET_MQ_destroy (op->mq);
479     op->mq = NULL;
480   }
481   if (NULL != (channel = op->channel))
482   {
483     op->channel = NULL;
484     GNUNET_CADET_channel_destroy (channel);
485   }
486   if (GNUNET_YES == gc)
487     collect_generation_garbage (set);
488   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
489    * there was a channel end handler that will free 'op' on the call stack. */
490 }
491
492
493 /**
494  * Iterator over hash map entries to free element entries.
495  *
496  * @param cls closure
497  * @param key current key code
498  * @param value a `struct ElementEntry *` to be free'd
499  * @return #GNUNET_YES (continue to iterate)
500  */
501 static int
502 destroy_elements_iterator (void *cls,
503                            const struct GNUNET_HashCode *key,
504                            void *value)
505 {
506   struct ElementEntry *ee = value;
507
508   GNUNET_free_non_null (ee->mutations);
509
510   GNUNET_free (ee);
511   return GNUNET_YES;
512 }
513
514
515 /**
516  * Destroy a set, and free all resources and operations associated with it.
517  *
518  * @param set the set to destroy
519  */
520 static void
521 set_destroy (struct Set *set)
522 {
523   if (NULL != set->client)
524   {
525     /* If the client is not dead yet, destroy it.  The client's destroy
526      * callback will call `set_destroy()` again in this case.  We do
527      * this so that the channel end handler still has a valid set handle
528      * to destroy. */
529     struct GNUNET_SERVER_Client *client = set->client;
530
531     set->client = NULL;
532     GNUNET_SERVER_client_disconnect (client);
533     return;
534   }
535   GNUNET_assert (NULL != set->state);
536   while (NULL != set->ops_head)
537     _GSS_operation_destroy (set->ops_head, GNUNET_NO);
538   set->vt->destroy_set (set->state);
539   set->state = NULL;
540   if (NULL != set->client_mq)
541   {
542     GNUNET_MQ_destroy (set->client_mq);
543     set->client_mq = NULL;
544   }
545   if (NULL != set->iter)
546   {
547     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
548     set->iter = NULL;
549     set->iteration_id++;
550   }
551   {
552     struct SetContent *content;
553     struct PendingMutation *pm;
554     struct PendingMutation *pm_current;
555
556     content = set->content;
557
558     // discard any pending mutations that reference this set
559     pm = content->pending_mutations_head;
560     while (NULL != pm)
561     {
562       pm_current = pm;
563       pm = pm->next;
564       if (pm_current-> set == set)
565         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
566                                      content->pending_mutations_tail,
567                                      pm_current);
568
569     }
570
571     set->content = NULL;
572     GNUNET_assert (0 != content->refcount);
573     content->refcount -= 1;
574     if (0 == content->refcount)
575     {
576       GNUNET_assert (NULL != content->elements);
577       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
578                                              &destroy_elements_iterator,
579                                              NULL);
580       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
581       content->elements = NULL;
582       GNUNET_free (content);
583     }
584   }
585   GNUNET_free_non_null (set->excluded_generations);
586   set->excluded_generations = NULL;
587   GNUNET_CONTAINER_DLL_remove (sets_head,
588                                sets_tail,
589                                set);
590
591   // remove set from pending copy requests
592   {
593     struct LazyCopyRequest *lcr;
594     lcr = lazy_copy_head;
595     while (NULL != lcr)
596     {
597       struct LazyCopyRequest *lcr_current;
598       lcr_current = lcr;
599       lcr = lcr->next;
600       if (lcr_current->source_set == set)
601         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
602                                      lazy_copy_tail,
603                                      lcr_current);
604     }
605   }
606
607   GNUNET_free (set);
608 }
609
610
611 /**
612  * Clean up after a client has disconnected
613  *
614  * @param cls closure, unused
615  * @param client the client to clean up after
616  */
617 static void
618 handle_client_disconnect (void *cls,
619                           struct GNUNET_SERVER_Client *client)
620 {
621   struct Listener *listener;
622   struct Set *set;
623
624   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
625               "client disconnected, cleaning up\n");
626   set = set_get (client);
627   if (NULL != set)
628   {
629     set->client = NULL;
630     set_destroy (set);
631     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632                 "Client's set destroyed\n");
633   }
634   listener = listener_get (client);
635   if (NULL != listener)
636   {
637     listener->client = NULL;
638     listener_destroy (listener);
639     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
640                 "Client's listener destroyed\n");
641   }
642 }
643
644
645 /**
646  * Destroy an incoming request from a remote peer
647  *
648  * @param incoming remote request to destroy
649  */
650 static void
651 incoming_destroy (struct Operation *incoming)
652 {
653   struct GNUNET_CADET_Channel *channel;
654
655   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
656   GNUNET_CONTAINER_DLL_remove (incoming_head,
657                                incoming_tail,
658                                incoming);
659   if (NULL != incoming->timeout_task)
660   {
661     GNUNET_SCHEDULER_cancel (incoming->timeout_task);
662     incoming->timeout_task = NULL;
663   }
664   /* make sure that the tunnel end handler will not destroy us again */
665   incoming->vt = NULL;
666   if (NULL != incoming->spec)
667   {
668     GNUNET_free (incoming->spec);
669     incoming->spec = NULL;
670   }
671   if (NULL != incoming->mq)
672   {
673     GNUNET_MQ_destroy (incoming->mq);
674     incoming->mq = NULL;
675   }
676   if (NULL != (channel = incoming->channel))
677   {
678     incoming->channel = NULL;
679     GNUNET_CADET_channel_destroy (channel);
680   }
681 }
682
683
684 /**
685  * Suggest the given request to the listener. The listening client can
686  * then accept or reject the remote request.
687  *
688  * @param incoming the incoming peer with the request to suggest
689  * @param listener the listener to suggest the request to
690  */
691 static void
692 incoming_suggest (struct Operation *incoming,
693                   struct Listener *listener)
694 {
695   struct GNUNET_MQ_Envelope *mqm;
696   struct GNUNET_SET_RequestMessage *cmsg;
697
698   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
699   GNUNET_assert (NULL != incoming->spec);
700   GNUNET_assert (0 == incoming->suggest_id);
701   incoming->suggest_id = suggest_id++;
702   if (0 == suggest_id)
703     suggest_id++;
704   GNUNET_assert (NULL != incoming->timeout_task);
705   GNUNET_SCHEDULER_cancel (incoming->timeout_task);
706   incoming->timeout_task = NULL;
707   mqm = GNUNET_MQ_msg_nested_mh (cmsg,
708                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
709                                  incoming->spec->context_msg);
710   GNUNET_assert (NULL != mqm);
711   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
712               "Suggesting incoming request with accept id %u to listener\n",
713               incoming->suggest_id);
714   cmsg->accept_id = htonl (incoming->suggest_id);
715   cmsg->peer_id = incoming->spec->peer;
716   GNUNET_MQ_send (listener->client_mq,
717                   mqm);
718 }
719
720
721 /**
722  * Handle a request for a set operation from another peer.  Checks if we
723  * have a listener waiting for such a request (and in that case initiates
724  * asking the listener about accepting the connection). If no listener
725  * is waiting, we queue the operation request in hope that a listener
726  * shows up soon (before timeout).
727  *
728  * This msg is expected as the first and only msg handled through the
729  * non-operation bound virtual table, acceptance of this operation replaces
730  * our virtual table and subsequent msgs would be routed differently (as
731  * we then know what type of operation this is).
732  *
733  * @param op the operation state
734  * @param mh the received message
735  * @return #GNUNET_OK if the channel should be kept alive,
736  *         #GNUNET_SYSERR to destroy the channel
737  */
738 static int
739 handle_incoming_msg (struct Operation *op,
740                      const struct GNUNET_MessageHeader *mh)
741 {
742   const struct OperationRequestMessage *msg;
743   struct Listener *listener = op->listener;
744   struct OperationSpecification *spec;
745   const struct GNUNET_MessageHeader *nested_context;
746
747   msg = (const struct OperationRequestMessage *) mh;
748   GNUNET_assert (GNUNET_YES == op->is_incoming);
749   if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
750   {
751     GNUNET_break_op (0);
752     return GNUNET_SYSERR;
753   }
754   /* double operation request */
755   if (NULL != op->spec)
756   {
757     GNUNET_break_op (0);
758     return GNUNET_SYSERR;
759   }
760   spec = GNUNET_new (struct OperationSpecification);
761   nested_context = GNUNET_MQ_extract_nested_mh (msg);
762   if ( (NULL != nested_context) &&
763        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
764   {
765     GNUNET_break_op (0);
766     GNUNET_free (spec);
767     return GNUNET_SYSERR;
768   }
769   /* Make a copy of the nested_context (application-specific context
770      information that is opaque to set) so we can pass it to the
771      listener later on */
772   if (NULL != nested_context)
773     spec->context_msg = GNUNET_copy_message (nested_context);
774   spec->operation = ntohl (msg->operation);
775   spec->app_id = listener->app_id;
776   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
777                                          UINT32_MAX);
778   spec->peer = op->peer;
779   spec->remote_element_count = ntohl (msg->element_count);
780   op->spec = spec;
781
782   listener = op->listener;
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Received P2P operation request (op %u, port %s) for active listener\n",
785               ntohl (msg->operation),
786               GNUNET_h2s (&listener->app_id));
787   incoming_suggest (op,
788                     listener);
789   return GNUNET_OK;
790 }
791
792
793 static void
794 execute_add (struct Set *set,
795              const struct GNUNET_MessageHeader *m)
796 {
797   const struct GNUNET_SET_ElementMessage *msg;
798   struct GNUNET_SET_Element el;
799   struct ElementEntry *ee;
800   struct GNUNET_HashCode hash;
801
802   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type));
803
804   msg = (const struct GNUNET_SET_ElementMessage *) m;
805   el.size = ntohs (m->size) - sizeof *msg;
806   el.data = &msg[1];
807   el.element_type = ntohs (msg->element_type);
808   GNUNET_SET_element_hash (&el, &hash);
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Client inserts element %s of size %u\n",
811               GNUNET_h2s (&hash),
812               el.size);
813
814   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
815                                           &hash);
816
817   if (NULL == ee)
818   {
819     ee = GNUNET_malloc (el.size + sizeof *ee);
820     ee->element.size = el.size;
821     GNUNET_memcpy (&ee[1],
822             el.data,
823             el.size);
824     ee->element.data = &ee[1];
825     ee->element.element_type = el.element_type;
826     ee->remote = GNUNET_NO;
827     ee->mutations = NULL;
828     ee->mutations_size = 0;
829     ee->element_hash = hash;
830     GNUNET_break (GNUNET_YES ==
831                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
832                                                      &ee->element_hash,
833                                                      ee,
834                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
835   }
836   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
837   {
838     /* same element inserted twice */
839     return;
840   }
841
842   {
843     struct MutationEvent mut = {
844       .generation = set->current_generation,
845       .added = GNUNET_YES
846     };
847     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
848   }
849
850   set->vt->add (set->state, ee);
851 }
852
853
854 static void
855 execute_remove (struct Set *set,
856                 const struct GNUNET_MessageHeader *m)
857 {
858   const struct GNUNET_SET_ElementMessage *msg;
859   struct GNUNET_SET_Element el;
860   struct ElementEntry *ee;
861   struct GNUNET_HashCode hash;
862
863   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type));
864
865   msg = (const struct GNUNET_SET_ElementMessage *) m;
866   el.size = ntohs (m->size) - sizeof *msg;
867   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868               "Client removes element of size %u\n",
869               el.size);
870   el.data = &msg[1];
871   el.element_type = ntohs (msg->element_type);
872   GNUNET_SET_element_hash (&el, &hash);
873   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
874                                           &hash);
875   if (NULL == ee)
876   {
877     /* Client tried to remove non-existing element. */
878     return;
879   }
880   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
881   {
882     /* Client tried to remove element twice */
883     return;
884   }
885   else
886   {
887     struct MutationEvent mut = {
888       .generation = set->current_generation,
889       .added = GNUNET_NO
890     };
891     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
892   }
893   set->vt->remove (set->state, ee);
894 }
895
896
897
898 static void
899 execute_mutation (struct Set *set,
900                   const struct GNUNET_MessageHeader *m)
901 {
902   switch (ntohs (m->type))
903   {
904     case GNUNET_MESSAGE_TYPE_SET_ADD:
905       execute_add (set, m);
906       break;
907     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
908       execute_remove (set, m);
909       break;
910     default:
911       GNUNET_break (0);
912   }
913 }
914
915
916
917 /**
918  * Send the next element of a set to the set's client.  The next element is given by
919  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
920  * are no more elements in the set.  The caller must ensure that the set's iterator is
921  * valid.
922  *
923  * The client will acknowledge each received element with a
924  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
925  * #handle_client_iter_ack() will then trigger the next transmission.
926  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
927  *
928  * @param set set that should send its next element to its client
929  */
930 static void
931 send_client_element (struct Set *set)
932 {
933   int ret;
934   struct ElementEntry *ee;
935   struct GNUNET_MQ_Envelope *ev;
936   struct GNUNET_SET_IterResponseMessage *msg;
937
938   GNUNET_assert (NULL != set->iter);
939
940 again:
941
942   ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
943                                                      NULL,
944                                                      (const void **) &ee);
945   if (GNUNET_NO == ret)
946   {
947     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948                 "Iteration on %p done.\n",
949                 (void *) set);
950     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
951     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
952     set->iter = NULL;
953     set->iteration_id++;
954
955     GNUNET_assert (set->content->iterator_count > 0);
956     set->content->iterator_count -= 1;
957
958     if (0 == set->content->iterator_count)
959     {
960       while (NULL != set->content->pending_mutations_head)
961       {
962         struct PendingMutation *pm;
963
964         pm = set->content->pending_mutations_head;
965         GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
966                                      set->content->pending_mutations_tail,
967                                      pm);
968         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969                     "Executing pending mutation on %p.\n",
970                     (void *) pm->set);
971         execute_mutation (pm->set, pm->mutation_message);
972         GNUNET_free (pm->mutation_message);
973         GNUNET_free (pm);
974       }
975     }
976
977   }
978   else
979   {
980     GNUNET_assert (NULL != ee);
981
982     if (GNUNET_NO == is_element_of_iteration (ee, set))
983       goto again;
984
985     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986                 "Sending iteration element on %p.\n",
987                 (void *) set);
988     ev = GNUNET_MQ_msg_extra (msg,
989                               ee->element.size,
990                               GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
991     GNUNET_memcpy (&msg[1],
992             ee->element.data,
993             ee->element.size);
994     msg->element_type = htons (ee->element.element_type);
995     msg->iteration_id = htons (set->iteration_id);
996   }
997   GNUNET_MQ_send (set->client_mq, ev);
998 }
999
1000
1001 /**
1002  * Called when a client wants to iterate the elements of a set.
1003  * Checks if we have a set associated with the client and if we
1004  * can right now start an iteration. If all checks out, starts
1005  * sending the elements of the set to the client.
1006  *
1007  * @param cls unused
1008  * @param client client that sent the message
1009  * @param m message sent by the client
1010  */
1011 static void
1012 handle_client_iterate (void *cls,
1013                        struct GNUNET_SERVER_Client *client,
1014                        const struct GNUNET_MessageHeader *m)
1015 {
1016   struct Set *set;
1017
1018   set = set_get (client);
1019   if (NULL == set)
1020   {
1021     /* attempt to iterate over a non existing set */
1022     GNUNET_break (0);
1023     GNUNET_SERVER_client_disconnect (client);
1024     return;
1025   }
1026   if (NULL != set->iter)
1027   {
1028     /* Only one concurrent iterate-action allowed per set */
1029     GNUNET_break (0);
1030     GNUNET_SERVER_client_disconnect (client);
1031     return;
1032   }
1033   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034               "Iterating set %p in gen %u with %u content elements\n",
1035               (void *) set,
1036               set->current_generation,
1037               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1038   GNUNET_SERVER_receive_done (client,
1039                               GNUNET_OK);
1040   set->content->iterator_count += 1;
1041   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1042   set->iter_generation = set->current_generation;
1043   send_client_element (set);
1044 }
1045
1046
1047 /**
1048  * Called when a client wants to create a new set.  This is typically
1049  * the first request from a client, and includes the type of set
1050  * operation to be performed.
1051  *
1052  * @param cls unused
1053  * @param client client that sent the message
1054  * @param m message sent by the client
1055  */
1056 static void
1057 handle_client_create_set (void *cls,
1058                           struct GNUNET_SERVER_Client *client,
1059                           const struct GNUNET_MessageHeader *m)
1060 {
1061   const struct GNUNET_SET_CreateMessage *msg;
1062   struct Set *set;
1063
1064   msg = (const struct GNUNET_SET_CreateMessage *) m;
1065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1066               "Client created new set (operation %u)\n",
1067               ntohl (msg->operation));
1068   if (NULL != set_get (client))
1069   {
1070     /* There can only be one set per client */
1071     GNUNET_break (0);
1072     GNUNET_SERVER_client_disconnect (client);
1073     return;
1074   }
1075   set = GNUNET_new (struct Set);
1076   switch (ntohl (msg->operation))
1077   {
1078   case GNUNET_SET_OPERATION_INTERSECTION:
1079     set->vt = _GSS_intersection_vt ();
1080     break;
1081   case GNUNET_SET_OPERATION_UNION:
1082     set->vt = _GSS_union_vt ();
1083     break;
1084   default:
1085     GNUNET_free (set);
1086     GNUNET_break (0);
1087     GNUNET_SERVER_client_disconnect (client);
1088     return;
1089   }
1090   set->operation = ntohl (msg->operation);
1091   set->state = set->vt->create ();
1092   if (NULL == set->state)
1093   {
1094     /* initialization failed (i.e. out of memory) */
1095     GNUNET_free (set);
1096     GNUNET_SERVER_client_disconnect (client);
1097     return;
1098   }
1099   set->content = GNUNET_new (struct SetContent);
1100   set->content->refcount = 1;
1101   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1102   set->client = client;
1103   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
1104   GNUNET_CONTAINER_DLL_insert (sets_head,
1105                                sets_tail,
1106                                set);
1107   GNUNET_SERVER_receive_done (client,
1108                               GNUNET_OK);
1109 }
1110
1111
1112 /**
1113  * Timeout happens iff:
1114  *  - we suggested an operation to our listener,
1115  *    but did not receive a response in time
1116  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1117  *
1118  * @param cls channel context
1119  * @param tc context information (why was this task triggered now)
1120  */
1121 static void
1122 incoming_timeout_cb (void *cls)
1123 {
1124   struct Operation *incoming = cls;
1125
1126   incoming->timeout_task = NULL;
1127   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129               "Remote peer's incoming request timed out\n");
1130   incoming_destroy (incoming);
1131 }
1132
1133
1134 /**
1135  * Terminates an incoming operation in case we have not yet received an
1136  * operation request. Called by the channel destruction handler.
1137  *
1138  * @param op the channel context
1139  */
1140 static void
1141 handle_incoming_disconnect (struct Operation *op)
1142 {
1143   GNUNET_assert (GNUNET_YES == op->is_incoming);
1144   /* channel is already dead, incoming_destroy must not
1145    * destroy it ... */
1146   op->channel = NULL;
1147   incoming_destroy (op);
1148   op->vt = NULL;
1149 }
1150
1151
1152 /**
1153  * Method called whenever another peer has added us to a channel the
1154  * other peer initiated.  Only called (once) upon reception of data
1155  * from a channel we listen on.
1156  *
1157  * The channel context represents the operation itself and gets added
1158  * to a DLL, from where it gets looked up when our local listener
1159  * client responds to a proposed/suggested operation or connects and
1160  * associates with this operation.
1161  *
1162  * @param cls closure
1163  * @param channel new handle to the channel
1164  * @param initiator peer that started the channel
1165  * @param port Port this channel is for.
1166  * @param options Unused.
1167  * @return initial channel context for the channel
1168  *         returns NULL on error
1169  */
1170 static void *
1171 channel_new_cb (void *cls,
1172                 struct GNUNET_CADET_Channel *channel,
1173                 const struct GNUNET_PeerIdentity *initiator,
1174                 const struct GNUNET_HashCode *port,
1175                 enum GNUNET_CADET_ChannelOption options)
1176 {
1177   static const struct SetVT incoming_vt = {
1178     .msg_handler = &handle_incoming_msg,
1179     .peer_disconnect = &handle_incoming_disconnect
1180   };
1181   struct Listener *listener = cls;
1182   struct Operation *incoming;
1183
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185               "New incoming channel\n");
1186   incoming = GNUNET_new (struct Operation);
1187   incoming->listener = listener;
1188   incoming->is_incoming = GNUNET_YES;
1189   incoming->peer = *initiator;
1190   incoming->channel = channel;
1191   incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
1192   incoming->vt = &incoming_vt;
1193   incoming->timeout_task
1194     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1195                                     &incoming_timeout_cb,
1196                                     incoming);
1197   GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
1198                                     incoming_tail,
1199                                     incoming);
1200   // incoming_suggest (incoming,
1201   //                  listener);
1202   return incoming;
1203 }
1204
1205
1206 /**
1207  * Called when a client wants to create a new listener.
1208  *
1209  * @param cls unused
1210  * @param client client that sent the message
1211  * @param m message sent by the client
1212  */
1213 static void
1214 handle_client_listen (void *cls,
1215                       struct GNUNET_SERVER_Client *client,
1216                       const struct GNUNET_MessageHeader *m)
1217 {
1218   const struct GNUNET_SET_ListenMessage *msg;
1219   struct Listener *listener;
1220   struct Operation *op;
1221
1222   msg = (const struct GNUNET_SET_ListenMessage *) m;
1223   if (NULL != listener_get (client))
1224   {
1225     /* max. one active listener per client! */
1226     GNUNET_break (0);
1227     GNUNET_SERVER_client_disconnect (client);
1228     return;
1229   }
1230   listener = GNUNET_new (struct Listener);
1231   listener->client = client;
1232   listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
1233   listener->app_id = msg->app_id;
1234   listener->operation = ntohl (msg->operation);
1235   GNUNET_CONTAINER_DLL_insert_tail (listeners_head,
1236                                     listeners_tail,
1237                                     listener);
1238   listener->open_port = GNUNET_CADET_open_port (cadet,
1239                                                 &msg->app_id,
1240                                                 &channel_new_cb,
1241                                                 listener);
1242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1243               "New listener created (op %u, port %s)\n",
1244               listener->operation,
1245               GNUNET_h2s (&listener->app_id));
1246
1247   /* check for existing incoming requests the listener might be interested in */
1248   for (op = incoming_head; NULL != op; op = op->next)
1249   {
1250     if (NULL == op->spec)
1251       continue; /* no details available yet */
1252     if (0 != op->suggest_id)
1253       continue; /* this one has been already suggested to a listener */
1254     if (listener->operation != op->spec->operation)
1255       continue; /* incompatible operation */
1256     if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1257                                      &op->spec->app_id))
1258       continue; /* incompatible appliation */
1259     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1260                 "Found matching existing request\n");
1261     incoming_suggest (op,
1262                       listener);
1263   }
1264   GNUNET_SERVER_receive_done (client,
1265                               GNUNET_OK);
1266 }
1267
1268
1269 /**
1270  * Called when the listening client rejects an operation
1271  * request by another peer.
1272  *
1273  * @param cls unused
1274  * @param client client that sent the message
1275  * @param m message sent by the client
1276  */
1277 static void
1278 handle_client_reject (void *cls,
1279                       struct GNUNET_SERVER_Client *client,
1280                       const struct GNUNET_MessageHeader *m)
1281 {
1282   struct Operation *incoming;
1283   const struct GNUNET_SET_RejectMessage *msg;
1284
1285   msg = (const struct GNUNET_SET_RejectMessage *) m;
1286   incoming = get_incoming (ntohl (msg->accept_reject_id));
1287   if (NULL == incoming)
1288   {
1289     /* no matching incoming operation for this reject */
1290     GNUNET_break (0);
1291     GNUNET_SERVER_receive_done (client,
1292                                 GNUNET_SYSERR);
1293     return;
1294   }
1295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1296               "Peer request (op %u, app %s) rejected by client\n",
1297               incoming->spec->operation,
1298               GNUNET_h2s (&incoming->spec->app_id));
1299   GNUNET_CADET_channel_destroy (incoming->channel);
1300   GNUNET_SERVER_receive_done (client,
1301                               GNUNET_OK);
1302 }
1303
1304
1305
1306 /**
1307  * Called when a client wants to add or remove an element to a set it inhabits.
1308  *
1309  * @param cls unused
1310  * @param client client that sent the message
1311  * @param m message sent by the client
1312  */
1313 static void
1314 handle_client_mutation (void *cls,
1315                         struct GNUNET_SERVER_Client *client,
1316                         const struct GNUNET_MessageHeader *m)
1317 {
1318   struct Set *set;
1319
1320   set = set_get (client);
1321   if (NULL == set)
1322   {
1323     /* client without a set requested an operation */
1324     GNUNET_break (0);
1325     GNUNET_SERVER_client_disconnect (client);
1326     return;
1327   }
1328
1329   GNUNET_SERVER_receive_done (client,
1330                               GNUNET_OK);
1331
1332   if (0 != set->content->iterator_count)
1333   {
1334     struct PendingMutation *pm;
1335
1336     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337                 "Scheduling mutation on set\n");
1338
1339     pm = GNUNET_new (struct PendingMutation);
1340     pm->mutation_message = GNUNET_copy_message (m);
1341     pm->set = set;
1342     GNUNET_CONTAINER_DLL_insert (set->content->pending_mutations_head,
1343                                  set->content->pending_mutations_tail,
1344                                  pm);
1345     return;
1346   }
1347
1348   execute_mutation (set, m);
1349 }
1350
1351
1352 /**
1353  * Advance the current generation of a set,
1354  * adding exclusion ranges if necessary.
1355  *
1356  * @param set the set where we want to advance the generation
1357  */
1358 static void
1359 advance_generation (struct Set *set)
1360 {
1361   struct GenerationRange r;
1362
1363   if (set->current_generation == set->content->latest_generation)
1364   {
1365     set->content->latest_generation += 1;
1366     set->current_generation += 1;
1367     return;
1368   }
1369
1370   GNUNET_assert (set->current_generation < set->content->latest_generation);
1371
1372   r.start = set->current_generation + 1;
1373   r.end = set->content->latest_generation + 1;
1374
1375   set->content->latest_generation = r.end;
1376   set->current_generation = r.end;
1377
1378   GNUNET_array_append (set->excluded_generations,
1379                        set->excluded_generations_size,
1380                        r);
1381 }
1382
1383
1384 /**
1385  * Called when a client wants to initiate a set operation with another
1386  * peer.  Initiates the CADET connection to the listener and sends the
1387  * request.
1388  *
1389  * @param cls unused
1390  * @param client client that sent the message
1391  * @param m message sent by the client
1392  */
1393 static void
1394 handle_client_evaluate (void *cls,
1395                         struct GNUNET_SERVER_Client *client,
1396                         const struct GNUNET_MessageHeader *m)
1397 {
1398   struct Set *set;
1399   const struct GNUNET_SET_EvaluateMessage *msg;
1400   struct OperationSpecification *spec;
1401   struct Operation *op;
1402   const struct GNUNET_MessageHeader *context;
1403
1404   set = set_get (client);
1405   if (NULL == set)
1406   {
1407     GNUNET_break (0);
1408     GNUNET_SERVER_client_disconnect (client);
1409     return;
1410   }
1411   msg = (const struct GNUNET_SET_EvaluateMessage *) m;
1412   spec = GNUNET_new (struct OperationSpecification);
1413   spec->operation = set->operation;
1414   spec->app_id = msg->app_id;
1415   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1416                                          UINT32_MAX);
1417   spec->peer = msg->target_peer;
1418   spec->set = set;
1419   spec->result_mode = ntohl (msg->result_mode);
1420   spec->client_request_id = ntohl (msg->request_id);
1421   context = GNUNET_MQ_extract_nested_mh (msg);
1422   op = GNUNET_new (struct Operation);
1423   op->spec = spec;
1424
1425   // Advance generation values, so that
1426   // mutations won't interfer with the running operation.
1427   op->generation_created = set->current_generation;
1428   advance_generation (set);
1429
1430   op->vt = set->vt;
1431   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1432                                set->ops_tail,
1433                                op);
1434   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1435               "Creating new CADET channel to port %s\n",
1436               GNUNET_h2s (&msg->app_id));
1437   op->channel = GNUNET_CADET_channel_create (cadet,
1438                                              op,
1439                                              &msg->target_peer,
1440                                              &msg->app_id,
1441                                              GNUNET_CADET_OPTION_RELIABLE);
1442   op->mq = GNUNET_CADET_mq_create (op->channel);
1443   set->vt->evaluate (op,
1444                      context);
1445   GNUNET_SERVER_receive_done (client,
1446                               GNUNET_OK);
1447 }
1448
1449
1450 /**
1451  * Handle an ack from a client, and send the next element. Note
1452  * that we only expect acks for set elements, not after the
1453  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1454  *
1455  * @param cls unused
1456  * @param client the client
1457  * @param m the message
1458  */
1459 static void
1460 handle_client_iter_ack (void *cls,
1461                         struct GNUNET_SERVER_Client *client,
1462                         const struct GNUNET_MessageHeader *m)
1463 {
1464   const struct GNUNET_SET_IterAckMessage *ack;
1465   struct Set *set;
1466
1467   set = set_get (client);
1468   if (NULL == set)
1469   {
1470     /* client without a set acknowledged receiving a value */
1471     GNUNET_break (0);
1472     GNUNET_SERVER_client_disconnect (client);
1473     return;
1474   }
1475   if (NULL == set->iter)
1476   {
1477     /* client sent an ack, but we were not expecting one (as
1478        set iteration has finished) */
1479     GNUNET_break (0);
1480     GNUNET_SERVER_client_disconnect (client);
1481     return;
1482   }
1483   ack = (const struct GNUNET_SET_IterAckMessage *) m;
1484   GNUNET_SERVER_receive_done (client,
1485                               GNUNET_OK);
1486   if (ntohl (ack->send_more))
1487   {
1488     send_client_element (set);
1489   }
1490   else
1491   {
1492     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1493     set->iter = NULL;
1494     set->iteration_id++;
1495   }
1496 }
1497
1498
1499 /**
1500  * Handle a request from the client to
1501  * copy a set.
1502  *
1503  * @param cls unused
1504  * @param client the client
1505  * @param mh the message
1506  */
1507 static void
1508 handle_client_copy_lazy_prepare (void *cls,
1509                                  struct GNUNET_SERVER_Client *client,
1510                                  const struct GNUNET_MessageHeader *mh)
1511 {
1512   struct Set *set;
1513   struct LazyCopyRequest *cr;
1514   struct GNUNET_MQ_Envelope *ev;
1515   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1516
1517   set = set_get (client);
1518   if (NULL == set)
1519   {
1520     /* client without a set requested an operation */
1521     GNUNET_break (0);
1522     GNUNET_SERVER_client_disconnect (client);
1523     return;
1524   }
1525
1526   cr = GNUNET_new (struct LazyCopyRequest);
1527
1528   cr->cookie = lazy_copy_cookie;
1529   lazy_copy_cookie += 1;
1530   cr->source_set = set;
1531
1532   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1533                                lazy_copy_tail,
1534                                cr);
1535
1536
1537   ev = GNUNET_MQ_msg (resp_msg,
1538                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1539   resp_msg->cookie = cr->cookie;
1540   GNUNET_MQ_send (set->client_mq, ev);
1541
1542
1543   GNUNET_SERVER_receive_done (client,
1544                               GNUNET_OK);
1545
1546   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1547               "Client requested lazy copy\n");
1548 }
1549
1550
1551 /**
1552  * Handle a request from the client to
1553  * connect to a copy of a set.
1554  *
1555  * @param cls unused
1556  * @param client the client
1557  * @param mh the message
1558  */
1559 static void
1560 handle_client_copy_lazy_connect (void *cls,
1561                                  struct GNUNET_SERVER_Client *client,
1562                                  const struct GNUNET_MessageHeader *mh)
1563 {
1564   struct LazyCopyRequest *cr;
1565   const struct GNUNET_SET_CopyLazyConnectMessage *msg =
1566       (const struct GNUNET_SET_CopyLazyConnectMessage *) mh;
1567   struct Set *set;
1568   int found;
1569
1570   if (NULL != set_get (client))
1571   {
1572     /* There can only be one set per client */
1573     GNUNET_break (0);
1574     GNUNET_SERVER_client_disconnect (client);
1575     return;
1576   }
1577
1578   found = GNUNET_NO;
1579
1580   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1581   {
1582     if (cr->cookie == msg->cookie)
1583     {
1584       found = GNUNET_YES;
1585       break;
1586     }
1587   }
1588
1589   if (GNUNET_NO == found)
1590   {
1591     /* client asked for copy with cookie we don't know */
1592     GNUNET_break (0);
1593     GNUNET_SERVER_client_disconnect (client);
1594     return;
1595   }
1596
1597   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1598                                lazy_copy_tail,
1599                                cr);
1600
1601   set = GNUNET_new (struct Set);
1602
1603   switch (cr->source_set->operation)
1604   {
1605   case GNUNET_SET_OPERATION_INTERSECTION:
1606     set->vt = _GSS_intersection_vt ();
1607     break;
1608   case GNUNET_SET_OPERATION_UNION:
1609     set->vt = _GSS_union_vt ();
1610     break;
1611   default:
1612     GNUNET_assert (0);
1613     return;
1614   }
1615
1616   if (NULL == set->vt->copy_state)
1617   {
1618     /* Lazy copy not supported for this set operation */
1619     GNUNET_break (0);
1620     GNUNET_free (set);
1621     GNUNET_free (cr);
1622     GNUNET_SERVER_client_disconnect (client);
1623     return;
1624   }
1625
1626   set->operation = cr->source_set->operation;
1627   set->state = set->vt->copy_state (cr->source_set);
1628   set->content = cr->source_set->content;
1629   set->content->refcount += 1;
1630
1631   set->current_generation = cr->source_set->current_generation;
1632   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1633   set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations,
1634                                              set->excluded_generations_size * sizeof (struct GenerationRange));
1635
1636   /* Advance the generation of the new set, so that mutations to the
1637      of the cloned set and the source set are independent. */
1638   advance_generation (set);
1639
1640
1641   set->client = client;
1642   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
1643   GNUNET_CONTAINER_DLL_insert (sets_head,
1644                                sets_tail,
1645                                set);
1646
1647   GNUNET_free (cr);
1648
1649   GNUNET_SERVER_receive_done (client,
1650                               GNUNET_OK);
1651
1652   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1653               "Client connected to lazy set\n");
1654 }
1655
1656
1657 /**
1658  * Handle a request from the client to
1659  * cancel a running set operation.
1660  *
1661  * @param cls unused
1662  * @param client the client
1663  * @param mh the message
1664  */
1665 static void
1666 handle_client_cancel (void *cls,
1667                       struct GNUNET_SERVER_Client *client,
1668                       const struct GNUNET_MessageHeader *mh)
1669 {
1670   const struct GNUNET_SET_CancelMessage *msg =
1671       (const struct GNUNET_SET_CancelMessage *) mh;
1672   struct Set *set;
1673   struct Operation *op;
1674   int found;
1675
1676   set = set_get (client);
1677   if (NULL == set)
1678   {
1679     /* client without a set requested an operation */
1680     GNUNET_break (0);
1681     GNUNET_SERVER_client_disconnect (client);
1682     return;
1683   }
1684   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1685               "Client requested cancel for op %u\n",
1686               ntohl (msg->request_id));
1687   found = GNUNET_NO;
1688   for (op = set->ops_head; NULL != op; op = op->next)
1689   {
1690     if (op->spec->client_request_id == ntohl (msg->request_id))
1691     {
1692       found = GNUNET_YES;
1693       break;
1694     }
1695   }
1696   if (GNUNET_NO == found)
1697   {
1698     /* It may happen that the operation was already destroyed due to
1699      * the other peer disconnecting.  The client may not know about this
1700      * yet and try to cancel the (just barely non-existent) operation.
1701      * So this is not a hard error.
1702      */
1703     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1704                 "Client canceled non-existent op\n");
1705   }
1706   else
1707   {
1708     _GSS_operation_destroy (op,
1709                             GNUNET_YES);
1710   }
1711   GNUNET_SERVER_receive_done (client,
1712                               GNUNET_OK);
1713 }
1714
1715
1716 /**
1717  * Handle a request from the client to accept a set operation that
1718  * came from a remote peer.  We forward the accept to the associated
1719  * operation for handling
1720  *
1721  * @param cls unused
1722  * @param client the client
1723  * @param mh the message
1724  */
1725 static void
1726 handle_client_accept (void *cls,
1727                       struct GNUNET_SERVER_Client *client,
1728                       const struct GNUNET_MessageHeader *mh)
1729 {
1730   struct Set *set;
1731   const struct GNUNET_SET_AcceptMessage *msg;
1732   struct Operation *op;
1733   struct GNUNET_SET_ResultMessage *result_message;
1734   struct GNUNET_MQ_Envelope *ev;
1735
1736   msg = (const struct GNUNET_SET_AcceptMessage *) mh;
1737   set = set_get (client);
1738   if (NULL == set)
1739   {
1740     /* client without a set requested to accept */
1741     GNUNET_break (0);
1742     GNUNET_SERVER_client_disconnect (client);
1743     return;
1744   }
1745   op = get_incoming (ntohl (msg->accept_reject_id));
1746   if (NULL == op)
1747   {
1748     /* It is not an error if the set op does not exist -- it may
1749      * have been destroyed when the partner peer disconnected. */
1750     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1751                 "Client accepted request that is no longer active\n");
1752     ev = GNUNET_MQ_msg (result_message,
1753                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1754     result_message->request_id = msg->request_id;
1755     result_message->element_type = 0;
1756     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1757     GNUNET_MQ_send (set->client_mq, ev);
1758     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1759     return;
1760   }
1761
1762   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1763               "Client accepting request %u\n",
1764               ntohl (msg->accept_reject_id));
1765   GNUNET_assert (GNUNET_YES == op->is_incoming);
1766   op->is_incoming = GNUNET_NO;
1767   GNUNET_CONTAINER_DLL_remove (incoming_head,
1768                                incoming_tail,
1769                                op);
1770   op->spec->set = set;
1771   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1772                                set->ops_tail,
1773                                op);
1774   op->spec->client_request_id = ntohl (msg->request_id);
1775   op->spec->result_mode = ntohl (msg->result_mode);
1776
1777   // Advance generation values, so that
1778   // mutations won't interfer with the running operation.
1779   op->generation_created = set->current_generation;
1780   advance_generation (set);
1781
1782   op->vt = set->vt;
1783   op->vt->accept (op);
1784   GNUNET_SERVER_receive_done (client,
1785                               GNUNET_OK);
1786 }
1787
1788
1789 /**
1790  * Called to clean up, after a shutdown has been requested.
1791  *
1792  * @param cls closure
1793  */
1794 static void
1795 shutdown_task (void *cls)
1796 {
1797   while (NULL != incoming_head)
1798     incoming_destroy (incoming_head);
1799   while (NULL != listeners_head)
1800     listener_destroy (listeners_head);
1801   while (NULL != sets_head)
1802     set_destroy (sets_head);
1803
1804   /* it's important to destroy cadet at the end, as all channels
1805    * must be destroyed before the cadet handle! */
1806   if (NULL != cadet)
1807   {
1808     GNUNET_CADET_disconnect (cadet);
1809     cadet = NULL;
1810   }
1811   GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
1812   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1813               "handled shutdown request\n");
1814 }
1815
1816
1817 /**
1818  * Function called whenever a channel is destroyed.  Should clean up
1819  * any associated state.  It must NOT call
1820  * GNUNET_CADET_channel_destroy() on the channel.
1821  *
1822  * The peer_disconnect function is part of a a virtual table set initially either
1823  * when a peer creates a new channel with us, or once we create
1824  * a new channel ourselves (evaluate).
1825  *
1826  * Once we know the exact type of operation (union/intersection), the vt is
1827  * replaced with an operation specific instance (_GSS_[op]_vt).
1828  *
1829  * @param cls closure (set from GNUNET_CADET_connect())
1830  * @param channel connection to the other end (henceforth invalid)
1831  * @param channel_ctx place where local state associated
1832  *                   with the channel is stored
1833  */
1834 static void
1835 channel_end_cb (void *cls,
1836                 const struct GNUNET_CADET_Channel *channel,
1837                 void *channel_ctx)
1838 {
1839   struct Operation *op = channel_ctx;
1840
1841   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1842               "channel_end_cb called\n");
1843   op->channel = NULL;
1844   op->keep++;
1845   /* the vt can be null if a client already requested canceling op. */
1846   if (NULL != op->vt)
1847   {
1848     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1849                 "calling peer disconnect due to channel end\n");
1850     op->vt->peer_disconnect (op);
1851   }
1852   op->keep--;
1853   if (0 == op->keep)
1854   {
1855     /* cadet will never call us with the context again! */
1856     GNUNET_free (op);
1857   }
1858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1859               "channel_end_cb finished\n");
1860 }
1861
1862
1863 /**
1864  * Functions with this signature are called whenever a message is
1865  * received via a cadet channel.
1866  *
1867  * The msg_handler is a virtual table set in initially either when a peer
1868  * creates a new channel with us, or once we create a new channel
1869  * ourselves (evaluate).
1870  *
1871  * Once we know the exact type of operation (union/intersection), the vt is
1872  * replaced with an operation specific instance (_GSS_[op]_vt).
1873  *
1874  * @param cls Closure (set from GNUNET_CADET_connect()).
1875  * @param channel Connection to the other end.
1876  * @param channel_ctx Place to store local state associated with the channel.
1877  * @param message The actual message.
1878  * @return #GNUNET_OK to keep the channel open,
1879  *         #GNUNET_SYSERR to close it (signal serious error).
1880  */
1881 static int
1882 dispatch_p2p_message (void *cls,
1883                       struct GNUNET_CADET_Channel *channel,
1884                       void **channel_ctx,
1885                       const struct GNUNET_MessageHeader *message)
1886 {
1887   struct Operation *op = *channel_ctx;
1888   int ret;
1889
1890   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1891               "Dispatching cadet message (type: %u)\n",
1892               ntohs (message->type));
1893   /* do this before the handler, as the handler might kill the channel */
1894   GNUNET_CADET_receive_done (channel);
1895   if (NULL != op->vt)
1896     ret = op->vt->msg_handler (op,
1897                                message);
1898   else
1899     ret = GNUNET_SYSERR;
1900   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1901               "Handled cadet message (type: %u)\n",
1902               ntohs (message->type));
1903   return ret;
1904 }
1905
1906
1907 /**
1908  * Function called by the service's run
1909  * method to run service-specific setup code.
1910  *
1911  * @param cls closure
1912  * @param server the initialized server
1913  * @param cfg configuration to use
1914  */
1915 static void
1916 run (void *cls,
1917      struct GNUNET_SERVER_Handle *server,
1918      const struct GNUNET_CONFIGURATION_Handle *cfg)
1919 {
1920   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1921     { &handle_client_accept, NULL,
1922       GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1923       sizeof (struct GNUNET_SET_AcceptMessage)},
1924     { &handle_client_iter_ack, NULL,
1925       GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1926       sizeof (struct GNUNET_SET_IterAckMessage) },
1927     { &handle_client_mutation, NULL,
1928       GNUNET_MESSAGE_TYPE_SET_ADD,
1929       0},
1930     { &handle_client_create_set, NULL,
1931       GNUNET_MESSAGE_TYPE_SET_CREATE,
1932       sizeof (struct GNUNET_SET_CreateMessage)},
1933     { &handle_client_iterate, NULL,
1934       GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1935       sizeof (struct GNUNET_MessageHeader)},
1936     { &handle_client_evaluate, NULL,
1937       GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1938       0},
1939     { &handle_client_listen, NULL,
1940       GNUNET_MESSAGE_TYPE_SET_LISTEN,
1941       sizeof (struct GNUNET_SET_ListenMessage)},
1942     { &handle_client_reject, NULL,
1943       GNUNET_MESSAGE_TYPE_SET_REJECT,
1944       sizeof (struct GNUNET_SET_RejectMessage)},
1945     { &handle_client_mutation, NULL,
1946       GNUNET_MESSAGE_TYPE_SET_REMOVE,
1947       0},
1948     { &handle_client_cancel, NULL,
1949       GNUNET_MESSAGE_TYPE_SET_CANCEL,
1950       sizeof (struct GNUNET_SET_CancelMessage)},
1951     { &handle_client_copy_lazy_prepare, NULL,
1952       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1953       sizeof (struct GNUNET_MessageHeader)},
1954     { &handle_client_copy_lazy_connect, NULL,
1955       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1956       sizeof (struct GNUNET_SET_CopyLazyConnectMessage)},
1957     { NULL, NULL, 0, 0}
1958   };
1959   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1960     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1961     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1962     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1963     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0},
1964     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0},
1965     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0},
1966     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1967     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
1968     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1969     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 0},
1970     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
1971     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
1972     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0},
1973     {NULL, 0, 0}
1974   };
1975
1976   configuration = cfg;
1977   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1978   GNUNET_SERVER_disconnect_notify (server,
1979                                    &handle_client_disconnect,
1980                                    NULL);
1981   GNUNET_SERVER_add_handlers (server,
1982                               server_handlers);
1983   _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
1984   cadet = GNUNET_CADET_connect (cfg,
1985                                 NULL,
1986                                 &channel_end_cb,
1987                                 cadet_handlers);
1988   if (NULL == cadet)
1989   {
1990     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1991                 _("Could not connect to cadet service\n"));
1992     return;
1993   }
1994 }
1995
1996
1997 /**
1998  * The main function for the set service.
1999  *
2000  * @param argc number of arguments from the command line
2001  * @param argv command line arguments
2002  * @return 0 ok, 1 on error
2003  */
2004 int
2005 main (int argc,
2006       char *const *argv)
2007 {
2008   int ret;
2009
2010   ret = GNUNET_SERVICE_run (argc, argv, "set",
2011                             GNUNET_SERVICE_OPTION_NONE,
2012                             &run, NULL);
2013   return (GNUNET_OK == ret) ? 0 : 1;
2014 }
2015
2016 /* end of gnunet-service-set.c */