de-duplicate operation types
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013, 2014, 2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
31
32 /**
33  * How long do we hold on to an incoming channel if there is
34  * no local listener before giving up?
35  */
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38 /**
39  * A listener is inhabited by a client, and waits for evaluation
40  * requests from remote peers.
41  */
42 struct Listener
43 {
44   /**
45    * Listeners are held in a doubly linked list.
46    */
47   struct Listener *next;
48
49   /**
50    * Listeners are held in a doubly linked list.
51    */
52   struct Listener *prev;
53
54   /**
55    * Client that owns the listener.
56    * Only one client may own a listener.
57    */
58   struct GNUNET_SERVICE_Client *client;
59
60   /**
61    * Message queue for the client
62    */
63   struct GNUNET_MQ_Handle *client_mq;
64
65   /**
66    * Application ID for the operation, used to distinguish
67    * multiple operations of the same type with the same peer.
68    */
69   struct GNUNET_HashCode app_id;
70
71   /**
72    * The port we are listening on with CADET.
73    */
74   struct GNUNET_CADET_Port *open_port;
75
76   /**
77    * The type of the operation.
78    */
79   enum GNUNET_SET_OperationType operation;
80 };
81
82
83 struct LazyCopyRequest
84 {
85   struct Set *source_set;
86   uint32_t cookie;
87
88   struct LazyCopyRequest *prev;
89   struct LazyCopyRequest *next;
90 };
91
92
93 /**
94  * Configuration of our local peer.
95  */
96 static const struct GNUNET_CONFIGURATION_Handle *configuration;
97
98 /**
99  * Handle to the cadet service, used to listen for and connect to
100  * remote peers.
101  */
102 static struct GNUNET_CADET_Handle *cadet;
103
104 /**
105  * Sets are held in a doubly linked list.
106  */
107 static struct Set *sets_head;
108
109 /**
110  * Sets are held in a doubly linked list.
111  */
112 static struct Set *sets_tail;
113
114 /**
115  * Listeners are held in a doubly linked list.
116  */
117 static struct Listener *listeners_head;
118
119 /**
120  * Listeners are held in a doubly linked list.
121  */
122 static struct Listener *listeners_tail;
123
124 /**
125  * Incoming sockets from remote peers are held in a doubly linked
126  * list.
127  */
128 static struct Operation *incoming_head;
129
130 /**
131  * Incoming sockets from remote peers are held in a doubly linked
132  * list.
133  */
134 static struct Operation *incoming_tail;
135
136 static struct LazyCopyRequest *lazy_copy_head;
137 static struct LazyCopyRequest *lazy_copy_tail;
138
139 static uint32_t lazy_copy_cookie = 1;
140
141 /**
142  * Counter for allocating unique IDs for clients, used to identify
143  * incoming operation requests from remote peers, that the client can
144  * choose to accept or refuse.
145  */
146 static uint32_t suggest_id = 1;
147
148 /**
149  * Statistics handle.
150  */
151 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
152
153
154 /**
155  * Get set that is owned by the given client, if any.
156  *
157  * @param client client to look for
158  * @return set that the client owns, NULL if the client
159  *         does not own a set
160  */
161 static struct Set *
162 set_get (struct GNUNET_SERVICE_Client *client)
163 {
164   for (struct Set *set = sets_head; NULL != set; set = set->next)
165     if (set->client == client)
166       return set;
167   return NULL;
168 }
169
170
171 /**
172  * Get the listener associated with the given client, if any.
173  *
174  * @param client the client
175  * @return listener associated with the client, NULL
176  *         if there isn't any
177  */
178 static struct Listener *
179 listener_get (struct GNUNET_SERVICE_Client *client)
180 {
181   for (struct Listener *listener = listeners_head;
182        NULL != listener;
183        listener = listener->next)
184     if (listener->client == client)
185       return listener;
186   return NULL;
187 }
188
189
190 /**
191  * Get the incoming socket associated with the given id.
192  *
193  * @param id id to look for
194  * @return the incoming socket associated with the id,
195  *         or NULL if there is none
196  */
197 static struct Operation *
198 get_incoming (uint32_t id)
199 {
200   for (struct Operation *op = incoming_head; NULL != op; op = op->next)
201     if (op->suggest_id == id)
202     {
203       GNUNET_assert (GNUNET_YES == op->is_incoming);
204       return op;
205     }
206   return NULL;
207 }
208
209
210 /**
211  * Destroy a listener, free all resources associated with it.
212  *
213  * @param listener listener to destroy
214  */
215 static void
216 listener_destroy (struct Listener *listener)
217 {
218   /* If the client is not dead yet, destroy it.
219    * The client's destroy callback will destroy the listener again. */
220   if (NULL != listener->client)
221   {
222     struct GNUNET_SERVICE_Client *client = listener->client;
223
224     GNUNET_MQ_destroy (listener->client_mq);
225     listener->client_mq = NULL;
226
227     listener->client = NULL;
228     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
229                 "Disconnecting listener client\n");
230     GNUNET_SERVICE_client_drop (client);
231     return;
232   }
233   GNUNET_CADET_close_port (listener->open_port);
234   GNUNET_CONTAINER_DLL_remove (listeners_head,
235                                listeners_tail,
236                                listener);
237   GNUNET_free (listener);
238 }
239
240
241 /**
242  * Context for the #garbage_collect_cb().
243  */
244 struct GarbageContext
245 {
246
247   /**
248    * Map for which we are garbage collecting removed elements.
249    */
250   struct GNUNET_CONTAINER_MultiHashMap *map;
251
252   /**
253    * Lowest generation for which an operation is still pending.
254    */
255   unsigned int min_op_generation;
256
257   /**
258    * Largest generation for which an operation is still pending.
259    */
260   unsigned int max_op_generation;
261
262 };
263
264
265 /**
266  * Function invoked to check if an element can be removed from
267  * the set's history because it is no longer needed.
268  *
269  * @param cls the `struct GarbageContext *`
270  * @param key key of the element in the map
271  * @param value the `struct ElementEntry *`
272  * @return #GNUNET_OK (continue to iterate)
273  */
274 static int
275 garbage_collect_cb (void *cls,
276                     const struct GNUNET_HashCode *key,
277                     void *value)
278 {
279   //struct GarbageContext *gc = cls;
280   //struct ElementEntry *ee = value;
281
282   //if (GNUNET_YES != ee->removed)
283   //  return GNUNET_OK;
284   //if ( (gc->max_op_generation < ee->generation_added) ||
285   //     (ee->generation_removed > gc->min_op_generation) )
286   //{
287   //  GNUNET_assert (GNUNET_YES ==
288   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
289   //                                                       key,
290   //                                                       ee));
291   //  GNUNET_free (ee);
292   //}
293   return GNUNET_OK;
294 }
295
296
297 /**
298  * Collect and destroy elements that are not needed anymore, because
299  * their lifetime (as determined by their generation) does not overlap
300  * with any active set operation.
301  *
302  * @param set set to garbage collect
303  */
304 static void
305 collect_generation_garbage (struct Set *set)
306 {
307   struct Operation *op;
308   struct GarbageContext gc;
309
310   gc.min_op_generation = UINT_MAX;
311   gc.max_op_generation = 0;
312   for (op = set->ops_head; NULL != op; op = op->next)
313   {
314     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
315                                        op->generation_created);
316     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
317                                        op->generation_created);
318   }
319   gc.map = set->content->elements;
320   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
321                                          &garbage_collect_cb,
322                                          &gc);
323 }
324
325
326 static int
327 is_excluded_generation (unsigned int generation,
328                         struct GenerationRange *excluded,
329                         unsigned int excluded_size)
330 {
331   unsigned int i;
332
333   for (i = 0; i < excluded_size; i++)
334   {
335     if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
336       return GNUNET_YES;
337   }
338
339   return GNUNET_NO;
340 }
341
342
343 static int
344 is_element_of_generation (struct ElementEntry *ee,
345                           unsigned int query_generation,
346                           struct GenerationRange *excluded,
347                           unsigned int excluded_size)
348 {
349   struct MutationEvent *mut;
350   int is_present;
351   unsigned int i;
352
353   GNUNET_assert (NULL != ee->mutations);
354
355   if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size))
356   {
357     GNUNET_break (0);
358     return GNUNET_NO;
359   }
360
361   is_present = GNUNET_NO;
362
363   /* Could be made faster with binary search, but lists
364      are small, so why bother. */
365   for (i = 0; i < ee->mutations_size; i++)
366   {
367     mut = &ee->mutations[i];
368
369     if (mut->generation > query_generation)
370     {
371       /* The mutation doesn't apply to our generation
372          anymore.  We can'b break here, since mutations aren't
373          sorted by generation. */
374       continue;
375     }
376
377     if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size))
378     {
379       /* The generation is excluded (because it belongs to another
380          fork via a lazy copy) and thus mutations aren't considered
381          for membership testing. */
382       continue;
383     }
384
385     /* This would be an inconsistency in how we manage mutations. */
386     if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) )
387       GNUNET_assert (0);
388
389     /* Likewise. */
390     if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) )
391       GNUNET_assert (0);
392
393     is_present = mut->added;
394   }
395
396   return is_present;
397 }
398
399
400 int
401 _GSS_is_element_of_set (struct ElementEntry *ee,
402                         struct Set *set)
403 {
404   return is_element_of_generation (ee,
405                                    set->current_generation,
406                                    set->excluded_generations,
407                                    set->excluded_generations_size);
408 }
409
410
411 static int
412 is_element_of_iteration (struct ElementEntry *ee,
413                          struct Set *set)
414 {
415   return is_element_of_generation (ee,
416                                    set->iter_generation,
417                                    set->excluded_generations,
418                                    set->excluded_generations_size);
419 }
420
421
422 int
423 _GSS_is_element_of_operation (struct ElementEntry *ee,
424                               struct Operation *op)
425 {
426   return is_element_of_generation (ee,
427                                    op->generation_created,
428                                    op->spec->set->excluded_generations,
429                                    op->spec->set->excluded_generations_size);
430 }
431
432
433 /**
434  * Destroy the given operation.  Call the implementation-specific
435  * cancel function of the operation.  Disconnects from the remote
436  * peer.  Does not disconnect the client, as there may be multiple
437  * operations per set.
438  *
439  * @param op operation to destroy
440  * @param gc #GNUNET_YES to perform garbage collection on the set
441  */
442 void
443 _GSS_operation_destroy (struct Operation *op,
444                         int gc)
445 {
446   struct Set *set;
447   struct GNUNET_CADET_Channel *channel;
448
449   if (NULL == op->vt)
450   {
451     /* already in #_GSS_operation_destroy() */
452     return;
453   }
454   GNUNET_assert (GNUNET_NO == op->is_incoming);
455   GNUNET_assert (NULL != op->spec);
456   set = op->spec->set;
457   GNUNET_CONTAINER_DLL_remove (set->ops_head,
458                                set->ops_tail,
459                                op);
460   op->vt->cancel (op);
461   op->vt = NULL;
462   if (NULL != op->spec)
463   {
464     if (NULL != op->spec->context_msg)
465     {
466       GNUNET_free (op->spec->context_msg);
467       op->spec->context_msg = NULL;
468     }
469     GNUNET_free (op->spec);
470     op->spec = NULL;
471   }
472   if (NULL != (channel = op->channel))
473   {
474     op->channel = NULL;
475     GNUNET_CADET_channel_destroy (channel);
476   }
477
478   if (GNUNET_YES == gc)
479     collect_generation_garbage (set);
480   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
481    * there was a channel end handler that will free 'op' on the call stack. */
482 }
483
484
485 /**
486  * Iterator over hash map entries to free element entries.
487  *
488  * @param cls closure
489  * @param key current key code
490  * @param value a `struct ElementEntry *` to be free'd
491  * @return #GNUNET_YES (continue to iterate)
492  */
493 static int
494 destroy_elements_iterator (void *cls,
495                            const struct GNUNET_HashCode *key,
496                            void *value)
497 {
498   struct ElementEntry *ee = value;
499
500   GNUNET_free_non_null (ee->mutations);
501
502   GNUNET_free (ee);
503   return GNUNET_YES;
504 }
505
506
507 /**
508  * Destroy a set, and free all resources and operations associated with it.
509  *
510  * @param set the set to destroy
511  */
512 static void
513 set_destroy (struct Set *set)
514 {
515   if (NULL != set->client)
516   {
517     /* If the client is not dead yet, destroy it.  The client's destroy
518      * callback will call `set_destroy()` again in this case.  We do
519      * this so that the channel end handler still has a valid set handle
520      * to destroy. */
521     struct GNUNET_SERVICE_Client *client = set->client;
522
523     set->client = NULL;
524     GNUNET_SERVICE_client_drop (client);
525     return;
526   }
527   GNUNET_assert (NULL != set->state);
528   while (NULL != set->ops_head)
529     _GSS_operation_destroy (set->ops_head, GNUNET_NO);
530   set->vt->destroy_set (set->state);
531   set->state = NULL;
532   if (NULL != set->iter)
533   {
534     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
535     set->iter = NULL;
536     set->iteration_id++;
537   }
538   {
539     struct SetContent *content;
540     struct PendingMutation *pm;
541     struct PendingMutation *pm_current;
542
543     content = set->content;
544
545     // discard any pending mutations that reference this set
546     pm = content->pending_mutations_head;
547     while (NULL != pm)
548     {
549       pm_current = pm;
550       pm = pm->next;
551       if (pm_current-> set == set)
552         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
553                                      content->pending_mutations_tail,
554                                      pm_current);
555
556     }
557
558     set->content = NULL;
559     GNUNET_assert (0 != content->refcount);
560     content->refcount -= 1;
561     if (0 == content->refcount)
562     {
563       GNUNET_assert (NULL != content->elements);
564       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
565                                              &destroy_elements_iterator,
566                                              NULL);
567       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
568       content->elements = NULL;
569       GNUNET_free (content);
570     }
571   }
572   GNUNET_free_non_null (set->excluded_generations);
573   set->excluded_generations = NULL;
574   GNUNET_CONTAINER_DLL_remove (sets_head,
575                                sets_tail,
576                                set);
577
578   // remove set from pending copy requests
579   {
580     struct LazyCopyRequest *lcr;
581     lcr = lazy_copy_head;
582     while (NULL != lcr)
583     {
584       struct LazyCopyRequest *lcr_current;
585       lcr_current = lcr;
586       lcr = lcr->next;
587       if (lcr_current->source_set == set)
588         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
589                                      lazy_copy_tail,
590                                      lcr_current);
591     }
592   }
593
594   GNUNET_free (set);
595 }
596
597
598 /**
599  * Callback called when a client connects to the service.
600  *
601  * @param cls closure for the service
602  * @param c the new client that connected to the service
603  * @param mq the message queue used to send messages to the client
604  * @return @a c
605  */
606 static void *
607 client_connect_cb (void *cls,
608                    struct GNUNET_SERVICE_Client *c,
609                    struct GNUNET_MQ_Handle *mq)
610 {
611   return c;
612 }
613
614
615 /**
616  * Destroy an incoming request from a remote peer
617  *
618  * @param incoming remote request to destroy
619  */
620 static void
621 incoming_destroy (struct Operation *incoming)
622 {
623   struct GNUNET_CADET_Channel *channel;
624
625   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
626   GNUNET_CONTAINER_DLL_remove (incoming_head,
627                                incoming_tail,
628                                incoming);
629   if (NULL != incoming->timeout_task)
630   {
631     GNUNET_SCHEDULER_cancel (incoming->timeout_task);
632     incoming->timeout_task = NULL;
633   }
634   /* make sure that the tunnel end handler will not destroy us again */
635   incoming->vt = NULL;
636   if (NULL != incoming->spec)
637   {
638     GNUNET_free (incoming->spec);
639     incoming->spec = NULL;
640   }
641   if (NULL != (channel = incoming->channel))
642   {
643     incoming->channel = NULL;
644     GNUNET_CADET_channel_destroy (channel);
645   }
646 }
647
648
649 /**
650  * Clean up after a client has disconnected
651  *
652  * @param cls closure, unused
653  * @param client the client to clean up after
654  * @param internal_cls our client-specific internal data structure
655  */
656 static void
657 client_disconnect_cb (void *cls,
658                       struct GNUNET_SERVICE_Client *client,
659                       void *internal_cls)
660 {
661   struct Set *set;
662
663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
664               "client disconnected, cleaning up\n");
665   set = set_get (client);
666   if (NULL != set)
667   {
668     set->client = NULL;
669     set_destroy (set);
670     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671                 "Client's set destroyed\n");
672   }
673   struct Listener *listener = listener_get (client);
674   if (NULL != listener)
675   {
676     /* destroy all incoming operations whose client just
677      * got destroyed */
678     //struct Operation *op = incoming_head;
679     /*
680     while (NULL != op)
681     {
682       struct Operation *curr = op;
683       op = op->next;
684       if ( (GNUNET_YES == curr->is_incoming) &&
685            (curr->listener == listener) )
686         incoming_destroy (curr);
687     }
688     */
689     listener->client = NULL;
690     listener_destroy (listener);
691     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
692                 "Client's listener destroyed\n");
693   }
694 }
695
696
697 /**
698  * Suggest the given request to the listener. The listening client can
699  * then accept or reject the remote request.
700  *
701  * @param incoming the incoming peer with the request to suggest
702  * @param listener the listener to suggest the request to
703  */
704 static void
705 incoming_suggest (struct Operation *incoming,
706                   struct Listener *listener)
707 {
708   struct GNUNET_MQ_Envelope *mqm;
709   struct GNUNET_SET_RequestMessage *cmsg;
710
711   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
712   GNUNET_assert (NULL != incoming->spec);
713   GNUNET_assert (0 == incoming->suggest_id);
714   incoming->suggest_id = suggest_id++;
715   if (0 == suggest_id)
716     suggest_id++;
717   GNUNET_assert (NULL != incoming->timeout_task);
718   GNUNET_SCHEDULER_cancel (incoming->timeout_task);
719   incoming->timeout_task = NULL;
720   mqm = GNUNET_MQ_msg_nested_mh (cmsg,
721                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
722                                  incoming->spec->context_msg);
723   GNUNET_assert (NULL != mqm);
724   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
725               "Suggesting incoming request with accept id %u to listener\n",
726               incoming->suggest_id);
727   cmsg->accept_id = htonl (incoming->suggest_id);
728   cmsg->peer_id = incoming->spec->peer;
729   GNUNET_MQ_send (listener->client_mq,
730                   mqm);
731 }
732
733
734 /**
735  * Check a request for a set operation from another peer.
736  *
737  * @param cls the operation state
738  * @param msg the received message
739  * @return #GNUNET_OK if the channel should be kept alive,
740  *         #GNUNET_SYSERR to destroy the channel
741  */
742 static int
743 check_incoming_msg (void *cls,
744                     const struct OperationRequestMessage *msg)
745 {
746   struct Operation *op = cls;
747   const struct GNUNET_MessageHeader *nested_context;
748
749   /* double operation request */
750   if (NULL != op->spec)
751   {
752     GNUNET_break_op (0);
753     return GNUNET_SYSERR;
754   }
755   nested_context = GNUNET_MQ_extract_nested_mh (msg);
756   if ( (NULL != nested_context) &&
757        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
758   {
759     GNUNET_break_op (0);
760     return GNUNET_SYSERR;
761   }
762   return GNUNET_OK;
763 }
764
765
766 /**
767  * Handle a request for a set operation from another peer.  Checks if we
768  * have a listener waiting for such a request (and in that case initiates
769  * asking the listener about accepting the connection). If no listener
770  * is waiting, we queue the operation request in hope that a listener
771  * shows up soon (before timeout).
772  *
773  * This msg is expected as the first and only msg handled through the
774  * non-operation bound virtual table, acceptance of this operation replaces
775  * our virtual table and subsequent msgs would be routed differently (as
776  * we then know what type of operation this is).
777  *
778  * @param cls the operation state
779  * @param msg the received message
780  * @return #GNUNET_OK if the channel should be kept alive,
781  *         #GNUNET_SYSERR to destroy the channel
782  */
783 static void
784 handle_incoming_msg (void *cls,
785                      const struct OperationRequestMessage *msg)
786 {
787   struct Operation *op = cls;
788   struct Listener *listener = op->listener;
789   struct OperationSpecification *spec;
790   const struct GNUNET_MessageHeader *nested_context;
791
792   GNUNET_assert (GNUNET_YES == op->is_incoming);
793   spec = GNUNET_new (struct OperationSpecification);
794   nested_context = GNUNET_MQ_extract_nested_mh (msg);
795   /* Make a copy of the nested_context (application-specific context
796      information that is opaque to set) so we can pass it to the
797      listener later on */
798   if (NULL != nested_context)
799     spec->context_msg = GNUNET_copy_message (nested_context);
800   spec->operation = ntohl (msg->operation);
801   spec->app_id = listener->app_id;
802   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
803                                          UINT32_MAX);
804   spec->peer = op->peer;
805   spec->remote_element_count = ntohl (msg->element_count);
806   op->spec = spec;
807   listener = op->listener;
808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809               "Received P2P operation request (op %u, port %s) for active listener\n",
810               (uint32_t) ntohl (msg->operation),
811               GNUNET_h2s (&listener->app_id));
812   incoming_suggest (op,
813                     listener);
814 }
815
816
817 static void
818 execute_add (struct Set *set,
819              const struct GNUNET_MessageHeader *m)
820 {
821   const struct GNUNET_SET_ElementMessage *msg;
822   struct GNUNET_SET_Element el;
823   struct ElementEntry *ee;
824   struct GNUNET_HashCode hash;
825
826   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type));
827
828   msg = (const struct GNUNET_SET_ElementMessage *) m;
829   el.size = ntohs (m->size) - sizeof *msg;
830   el.data = &msg[1];
831   el.element_type = ntohs (msg->element_type);
832   GNUNET_SET_element_hash (&el, &hash);
833
834   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
835                                           &hash);
836
837   if (NULL == ee)
838   {
839     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840                 "Client inserts element %s of size %u\n",
841                 GNUNET_h2s (&hash),
842                 el.size);
843     ee = GNUNET_malloc (el.size + sizeof *ee);
844     ee->element.size = el.size;
845     GNUNET_memcpy (&ee[1],
846             el.data,
847             el.size);
848     ee->element.data = &ee[1];
849     ee->element.element_type = el.element_type;
850     ee->remote = GNUNET_NO;
851     ee->mutations = NULL;
852     ee->mutations_size = 0;
853     ee->element_hash = hash;
854     GNUNET_break (GNUNET_YES ==
855                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
856                                                      &ee->element_hash,
857                                                      ee,
858                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
859   }
860   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
861   {
862     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863                 "Client inserted element %s of size %u twice (ignored)\n",
864                 GNUNET_h2s (&hash),
865                 el.size);
866
867     /* same element inserted twice */
868     return;
869   }
870
871   {
872     struct MutationEvent mut = {
873       .generation = set->current_generation,
874       .added = GNUNET_YES
875     };
876     GNUNET_array_append (ee->mutations,
877                          ee->mutations_size,
878                          mut);
879   }
880
881   set->vt->add (set->state, ee);
882 }
883
884
885 static void
886 execute_remove (struct Set *set,
887                 const struct GNUNET_MessageHeader *m)
888 {
889   const struct GNUNET_SET_ElementMessage *msg;
890   struct GNUNET_SET_Element el;
891   struct ElementEntry *ee;
892   struct GNUNET_HashCode hash;
893
894   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type));
895
896   msg = (const struct GNUNET_SET_ElementMessage *) m;
897   el.size = ntohs (m->size) - sizeof *msg;
898   el.data = &msg[1];
899   el.element_type = ntohs (msg->element_type);
900   GNUNET_SET_element_hash (&el, &hash);
901   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
902                                           &hash);
903   if (NULL == ee)
904   {
905     /* Client tried to remove non-existing element. */
906     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                 "Client removes non-existing element of size %u\n",
908                 el.size);
909     return;
910   }
911   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
912   {
913     /* Client tried to remove element twice */
914     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
915                 "Client removed element of size %u twice (ignored)\n",
916                 el.size);
917     return;
918   }
919   else
920   {
921     struct MutationEvent mut = {
922       .generation = set->current_generation,
923       .added = GNUNET_NO
924     };
925
926     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927                 "Client removes element of size %u\n",
928                 el.size);
929
930     GNUNET_array_append (ee->mutations,
931                          ee->mutations_size,
932                          mut);
933   }
934   set->vt->remove (set->state, ee);
935 }
936
937
938
939 static void
940 execute_mutation (struct Set *set,
941                   const struct GNUNET_MessageHeader *m)
942 {
943   switch (ntohs (m->type))
944   {
945     case GNUNET_MESSAGE_TYPE_SET_ADD:
946       execute_add (set, m);
947       break;
948     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
949       execute_remove (set, m);
950       break;
951     default:
952       GNUNET_break (0);
953   }
954 }
955
956
957
958 /**
959  * Send the next element of a set to the set's client.  The next element is given by
960  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
961  * are no more elements in the set.  The caller must ensure that the set's iterator is
962  * valid.
963  *
964  * The client will acknowledge each received element with a
965  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
966  * #handle_client_iter_ack() will then trigger the next transmission.
967  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
968  *
969  * @param set set that should send its next element to its client
970  */
971 static void
972 send_client_element (struct Set *set)
973 {
974   int ret;
975   struct ElementEntry *ee;
976   struct GNUNET_MQ_Envelope *ev;
977   struct GNUNET_SET_IterResponseMessage *msg;
978
979   GNUNET_assert (NULL != set->iter);
980
981 again:
982
983   ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
984                                                      NULL,
985                                                      (const void **) &ee);
986   if (GNUNET_NO == ret)
987   {
988     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989                 "Iteration on %p done.\n",
990                 (void *) set);
991     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
992     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
993     set->iter = NULL;
994     set->iteration_id++;
995
996     GNUNET_assert (set->content->iterator_count > 0);
997     set->content->iterator_count -= 1;
998
999     if (0 == set->content->iterator_count)
1000     {
1001       while (NULL != set->content->pending_mutations_head)
1002       {
1003         struct PendingMutation *pm;
1004
1005         pm = set->content->pending_mutations_head;
1006         GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
1007                                      set->content->pending_mutations_tail,
1008                                      pm);
1009         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                     "Executing pending mutation on %p.\n",
1011                     (void *) pm->set);
1012         execute_mutation (pm->set, pm->mutation_message);
1013         GNUNET_free (pm->mutation_message);
1014         GNUNET_free (pm);
1015       }
1016     }
1017
1018   }
1019   else
1020   {
1021     GNUNET_assert (NULL != ee);
1022
1023     if (GNUNET_NO == is_element_of_iteration (ee, set))
1024       goto again;
1025
1026     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                 "Sending iteration element on %p.\n",
1028                 (void *) set);
1029     ev = GNUNET_MQ_msg_extra (msg,
1030                               ee->element.size,
1031                               GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1032     GNUNET_memcpy (&msg[1],
1033             ee->element.data,
1034             ee->element.size);
1035     msg->element_type = htons (ee->element.element_type);
1036     msg->iteration_id = htons (set->iteration_id);
1037   }
1038   GNUNET_MQ_send (set->client_mq, ev);
1039 }
1040
1041
1042 /**
1043  * Called when a client wants to iterate the elements of a set.
1044  * Checks if we have a set associated with the client and if we
1045  * can right now start an iteration. If all checks out, starts
1046  * sending the elements of the set to the client.
1047  *
1048  * @param cls client that sent the message
1049  * @param m message sent by the client
1050  */
1051 static void
1052 handle_client_iterate (void *cls,
1053                        const struct GNUNET_MessageHeader *m)
1054 {
1055   struct GNUNET_SERVICE_Client *client = cls;
1056   struct Set *set;
1057
1058   set = set_get (client);
1059   if (NULL == set)
1060   {
1061     /* attempt to iterate over a non existing set */
1062     GNUNET_break (0);
1063     GNUNET_SERVICE_client_drop (client);
1064     return;
1065   }
1066   if (NULL != set->iter)
1067   {
1068     /* Only one concurrent iterate-action allowed per set */
1069     GNUNET_break (0);
1070     GNUNET_SERVICE_client_drop (client);
1071     return;
1072   }
1073   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074               "Iterating set %p in gen %u with %u content elements\n",
1075               (void *) set,
1076               set->current_generation,
1077               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1078   GNUNET_SERVICE_client_continue (client);
1079   set->content->iterator_count += 1;
1080   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1081   set->iter_generation = set->current_generation;
1082   send_client_element (set);
1083 }
1084
1085
1086 /**
1087  * Called when a client wants to create a new set.  This is typically
1088  * the first request from a client, and includes the type of set
1089  * operation to be performed.
1090  *
1091  * @param cls client that sent the message
1092  * @param m message sent by the client
1093  */
1094 static void
1095 handle_client_create_set (void *cls,
1096                           const struct GNUNET_SET_CreateMessage *msg)
1097 {
1098   struct GNUNET_SERVICE_Client *client = cls;
1099   struct Set *set;
1100
1101   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1102               "Client created new set (operation %u)\n",
1103               (uint32_t) ntohl (msg->operation));
1104   if (NULL != set_get (client))
1105   {
1106     /* There can only be one set per client */
1107     GNUNET_break (0);
1108     GNUNET_SERVICE_client_drop (client);
1109     return;
1110   }
1111   set = GNUNET_new (struct Set);
1112   switch (ntohl (msg->operation))
1113   {
1114   case GNUNET_SET_OPERATION_INTERSECTION:
1115     set->vt = _GSS_intersection_vt ();
1116     break;
1117   case GNUNET_SET_OPERATION_UNION:
1118     set->vt = _GSS_union_vt ();
1119     break;
1120   default:
1121     GNUNET_free (set);
1122     GNUNET_break (0);
1123     GNUNET_SERVICE_client_drop (client);
1124     return;
1125   }
1126   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1127   set->state = set->vt->create ();
1128   if (NULL == set->state)
1129   {
1130     /* initialization failed (i.e. out of memory) */
1131     GNUNET_free (set);
1132     GNUNET_SERVICE_client_drop (client);
1133     return;
1134   }
1135   set->content = GNUNET_new (struct SetContent);
1136   set->content->refcount = 1;
1137   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1138   set->client = client;
1139   set->client_mq = GNUNET_SERVICE_client_get_mq (client);
1140   GNUNET_CONTAINER_DLL_insert (sets_head,
1141                                sets_tail,
1142                                set);
1143   GNUNET_SERVICE_client_continue (client);
1144 }
1145
1146
1147 /**
1148  * Timeout happens iff:
1149  *  - we suggested an operation to our listener,
1150  *    but did not receive a response in time
1151  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1152  *
1153  * @param cls channel context
1154  * @param tc context information (why was this task triggered now)
1155  */
1156 static void
1157 incoming_timeout_cb (void *cls)
1158 {
1159   struct Operation *incoming = cls;
1160
1161   incoming->timeout_task = NULL;
1162   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1163   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164               "Remote peer's incoming request timed out\n");
1165   incoming_destroy (incoming);
1166 }
1167
1168
1169 /**
1170  * Terminates an incoming operation in case we have not yet received an
1171  * operation request. Called by the channel destruction handler.
1172  *
1173  * @param op the channel context
1174  */
1175 static void
1176 handle_incoming_disconnect (struct Operation *op)
1177 {
1178   GNUNET_assert (GNUNET_YES == op->is_incoming);
1179   /* channel is already dead, incoming_destroy must not
1180    * destroy it ... */
1181   op->channel = NULL;
1182   incoming_destroy (op);
1183   op->vt = NULL;
1184 }
1185
1186
1187 /**
1188  * Method called whenever another peer has added us to a channel the
1189  * other peer initiated.  Only called (once) upon reception of data
1190  * from a channel we listen on.
1191  *
1192  * The channel context represents the operation itself and gets added
1193  * to a DLL, from where it gets looked up when our local listener
1194  * client responds to a proposed/suggested operation or connects and
1195  * associates with this operation.
1196  *
1197  * @param cls closure
1198  * @param channel new handle to the channel
1199  * @param source peer that started the channel
1200  * @return initial channel context for the channel
1201  *         returns NULL on error
1202  */
1203 static void *
1204 channel_new_cb (void *cls,
1205                 struct GNUNET_CADET_Channel *channel,
1206                 const struct GNUNET_PeerIdentity *source)
1207 {
1208   static const struct SetVT incoming_vt = {
1209     .peer_disconnect = &handle_incoming_disconnect
1210   };
1211   struct Listener *listener = cls;
1212   struct Operation *incoming;
1213
1214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215               "New incoming channel\n");
1216   incoming = GNUNET_new (struct Operation);
1217   incoming->listener = listener;
1218   incoming->is_incoming = GNUNET_YES;
1219   incoming->peer = *source;
1220   incoming->channel = channel;
1221   incoming->mq = GNUNET_CADET_get_mq (incoming->channel);
1222   incoming->vt = &incoming_vt;
1223   incoming->timeout_task
1224     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1225                                     &incoming_timeout_cb,
1226                                     incoming);
1227   GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
1228                                     incoming_tail,
1229                                     incoming);
1230   // incoming_suggest (incoming,
1231   //                  listener);
1232   return incoming;
1233 }
1234
1235
1236 /**
1237  * Function called whenever a channel is destroyed.  Should clean up
1238  * any associated state.  It must NOT call
1239  * GNUNET_CADET_channel_destroy() on the channel.
1240  *
1241  * The peer_disconnect function is part of a a virtual table set initially either
1242  * when a peer creates a new channel with us, or once we create
1243  * a new channel ourselves (evaluate).
1244  *
1245  * Once we know the exact type of operation (union/intersection), the vt is
1246  * replaced with an operation specific instance (_GSS_[op]_vt).
1247  *
1248  * @param channel_ctx place where local state associated
1249  *                   with the channel is stored
1250  * @param channel connection to the other end (henceforth invalid)
1251  */
1252 static void
1253 channel_end_cb (void *channel_ctx,
1254                 const struct GNUNET_CADET_Channel *channel)
1255 {
1256   struct Operation *op = channel_ctx;
1257
1258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259               "channel_end_cb called\n");
1260   op->channel = NULL;
1261   op->keep++;
1262   /* the vt can be null if a client already requested canceling op. */
1263   if (NULL != op->vt)
1264   {
1265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266                 "calling peer disconnect due to channel end\n");
1267     op->vt->peer_disconnect (op);
1268   }
1269   op->keep--;
1270   if (0 == op->keep)
1271   {
1272     /* cadet will never call us with the context again! */
1273     GNUNET_free (op);
1274   }
1275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276               "channel_end_cb finished\n");
1277 }
1278
1279
1280 /**
1281  * Function called whenever an MQ-channel's transmission window size changes.
1282  *
1283  * The first callback in an outgoing channel will be with a non-zero value
1284  * and will mean the channel is connected to the destination.
1285  *
1286  * For an incoming channel it will be called immediately after the
1287  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1288  *
1289  * @param cls Channel closure.
1290  * @param channel Connection to the other end (henceforth invalid).
1291  * @param window_size New window size. If the is more messages than buffer size
1292  *                    this value will be negative..
1293  */
1294 static void
1295 channel_window_cb (void *cls,
1296                    const struct GNUNET_CADET_Channel *channel,
1297                    int window_size)
1298 {
1299   /* FIXME: not implemented, we could do flow control here... */
1300 }
1301
1302
1303 /**
1304  * Called when a client wants to create a new listener.
1305  *
1306  * @param cls client that sent the message
1307  * @param msg message sent by the client
1308  */
1309 static void
1310 handle_client_listen (void *cls,
1311                       const struct GNUNET_SET_ListenMessage *msg)
1312 {
1313   struct GNUNET_SERVICE_Client *client = cls;
1314   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1315     GNUNET_MQ_hd_var_size (incoming_msg,
1316                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1317                            struct OperationRequestMessage,
1318                            NULL),
1319     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1320                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1321                            struct IBFMessage,
1322                            NULL),
1323     GNUNET_MQ_hd_var_size (union_p2p_elements,
1324                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1325                            struct GNUNET_SET_ElementMessage,
1326                            NULL),
1327     GNUNET_MQ_hd_var_size (union_p2p_offer,
1328                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1329                            struct GNUNET_MessageHeader,
1330                            NULL),
1331     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1332                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1333                            struct InquiryMessage,
1334                            NULL),
1335     GNUNET_MQ_hd_var_size (union_p2p_demand,
1336                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1337                            struct GNUNET_MessageHeader,
1338                            NULL),
1339     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1340                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1341                              struct GNUNET_MessageHeader,
1342                              NULL),
1343     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1344                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1345                              struct GNUNET_MessageHeader,
1346                              NULL),
1347     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1348                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1349                              struct GNUNET_MessageHeader,
1350                              NULL),
1351     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1352                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1353                            struct StrataEstimatorMessage,
1354                            NULL),
1355     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1356                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1357                            struct StrataEstimatorMessage,
1358                            NULL),
1359     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1360                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1361                            struct GNUNET_SET_ElementMessage,
1362                            NULL),
1363     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1364                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1365                              struct IntersectionElementInfoMessage,
1366                              NULL),
1367     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1368                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1369                            struct BFMessage,
1370                            NULL),
1371     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1372                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1373                              struct IntersectionDoneMessage,
1374                              NULL),
1375     GNUNET_MQ_handler_end ()
1376   };
1377   struct Listener *listener;
1378
1379   if (NULL != listener_get (client))
1380   {
1381     /* max. one active listener per client! */
1382     GNUNET_break (0);
1383     GNUNET_SERVICE_client_drop (client);
1384     return;
1385   }
1386   listener = GNUNET_new (struct Listener);
1387   listener->client = client;
1388   listener->client_mq = GNUNET_SERVICE_client_get_mq (client);
1389   listener->app_id = msg->app_id;
1390   listener->operation = ntohl (msg->operation);
1391   GNUNET_CONTAINER_DLL_insert_tail (listeners_head,
1392                                     listeners_tail,
1393                                     listener);
1394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395               "New listener created (op %u, port %s)\n",
1396               listener->operation,
1397               GNUNET_h2s (&listener->app_id));
1398   listener->open_port = GNUNET_CADET_open_porT (cadet,
1399                                                 &msg->app_id,
1400                                                 &channel_new_cb,
1401                                                 listener,
1402                                                 &channel_window_cb,
1403                                                 &channel_end_cb,
1404                                                 cadet_handlers);
1405   /* check for existing incoming requests the listener might be interested in */
1406   for (struct Operation *op = incoming_head; NULL != op; op = op->next)
1407   {
1408     if (NULL == op->spec)
1409       continue; /* no details available yet */
1410     if (0 != op->suggest_id)
1411       continue; /* this one has been already suggested to a listener */
1412     if (listener->operation != op->spec->operation)
1413       continue; /* incompatible operation */
1414     if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1415                                      &op->spec->app_id))
1416       continue; /* incompatible appliation */
1417     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418                 "Found matching existing request\n");
1419     incoming_suggest (op,
1420                       listener);
1421   }
1422   GNUNET_SERVICE_client_continue (client);
1423 }
1424
1425
1426 /**
1427  * Called when the listening client rejects an operation
1428  * request by another peer.
1429  *
1430  * @param cls client that sent the message
1431  * @param msg message sent by the client
1432  */
1433 static void
1434 handle_client_reject (void *cls,
1435                       const struct GNUNET_SET_RejectMessage *msg)
1436 {
1437   struct GNUNET_SERVICE_Client *client = cls;
1438   struct Operation *incoming;
1439
1440   incoming = get_incoming (ntohl (msg->accept_reject_id));
1441   if (NULL == incoming)
1442   {
1443     /* no matching incoming operation for this reject;
1444        could be that the other peer already disconnected... */
1445     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1446                 "Client rejected unknown operation %u\n",
1447                 (unsigned int) ntohl (msg->accept_reject_id));
1448     GNUNET_SERVICE_client_continue (client);
1449     return;
1450   }
1451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452               "Peer request (op %u, app %s) rejected by client\n",
1453               incoming->spec->operation,
1454               GNUNET_h2s (&incoming->spec->app_id));
1455   GNUNET_CADET_channel_destroy (incoming->channel);
1456   GNUNET_SERVICE_client_continue (client);
1457 }
1458
1459
1460 /**
1461  * Called when a client wants to add or remove an element to a set it inhabits.
1462  *
1463  * @param cls client that sent the message
1464  * @param m message sent by the client
1465  */
1466 static int
1467 check_client_mutation (void *cls,
1468                        const struct GNUNET_MessageHeader *m)
1469 {
1470   /* FIXME: any check we might want to do here? */
1471   return GNUNET_OK;
1472 }
1473
1474
1475 /**
1476  * Called when a client wants to add or remove an element to a set it inhabits.
1477  *
1478  * @param cls client that sent the message
1479  * @param m message sent by the client
1480  */
1481 static void
1482 handle_client_mutation (void *cls,
1483                         const struct GNUNET_MessageHeader *m)
1484 {
1485   struct GNUNET_SERVICE_Client *client = cls;
1486   struct Set *set;
1487
1488   set = set_get (client);
1489   if (NULL == set)
1490   {
1491     /* client without a set requested an operation */
1492     GNUNET_break (0);
1493     GNUNET_SERVICE_client_drop (client);
1494     return;
1495   }
1496   GNUNET_SERVICE_client_continue (client);
1497
1498   if (0 != set->content->iterator_count)
1499   {
1500     struct PendingMutation *pm;
1501
1502     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1503                 "Scheduling mutation on set\n");
1504     pm = GNUNET_new (struct PendingMutation);
1505     pm->mutation_message = GNUNET_copy_message (m);
1506     pm->set = set;
1507     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1508                                       set->content->pending_mutations_tail,
1509                                       pm);
1510     return;
1511   }
1512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1513               "Executing mutation on set\n");
1514   execute_mutation (set,
1515                     m);
1516 }
1517
1518
1519 /**
1520  * Advance the current generation of a set,
1521  * adding exclusion ranges if necessary.
1522  *
1523  * @param set the set where we want to advance the generation
1524  */
1525 static void
1526 advance_generation (struct Set *set)
1527 {
1528   struct GenerationRange r;
1529
1530   if (set->current_generation == set->content->latest_generation)
1531   {
1532     set->content->latest_generation++;
1533     set->current_generation++;
1534     return;
1535   }
1536
1537   GNUNET_assert (set->current_generation < set->content->latest_generation);
1538
1539   r.start = set->current_generation + 1;
1540   r.end = set->content->latest_generation + 1;
1541   set->content->latest_generation = r.end;
1542   set->current_generation = r.end;
1543   GNUNET_array_append (set->excluded_generations,
1544                        set->excluded_generations_size,
1545                        r);
1546 }
1547
1548
1549 /**
1550  * Called when a client wants to initiate a set operation with another
1551  * peer.  Initiates the CADET connection to the listener and sends the
1552  * request.
1553  *
1554  * @param cls client that sent the message
1555  * @param msg message sent by the client
1556  * @return #GNUNET_OK if the message is well-formed
1557  */
1558 static int
1559 check_client_evaluate (void *cls,
1560                         const struct GNUNET_SET_EvaluateMessage *msg)
1561 {
1562   /* FIXME: suboptimal, even if the context below could be NULL,
1563      there are malformed messages this does not check for... */
1564   return GNUNET_OK;
1565 }
1566
1567
1568 /**
1569  * Called when a client wants to initiate a set operation with another
1570  * peer.  Initiates the CADET connection to the listener and sends the
1571  * request.
1572  *
1573  * @param cls client that sent the message
1574  * @param msg message sent by the client
1575  */
1576 static void
1577 handle_client_evaluate (void *cls,
1578                         const struct GNUNET_SET_EvaluateMessage *msg)
1579 {
1580   struct GNUNET_SERVICE_Client *client = cls;
1581   struct Operation *op = GNUNET_new (struct Operation);
1582   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1583     GNUNET_MQ_hd_var_size (incoming_msg,
1584                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1585                            struct OperationRequestMessage,
1586                            op),
1587     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1588                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1589                            struct IBFMessage,
1590                            op),
1591     GNUNET_MQ_hd_var_size (union_p2p_elements,
1592                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1593                            struct GNUNET_SET_ElementMessage,
1594                            op),
1595     GNUNET_MQ_hd_var_size (union_p2p_offer,
1596                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1597                            struct GNUNET_MessageHeader,
1598                            op),
1599     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1600                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1601                            struct InquiryMessage,
1602                            op),
1603     GNUNET_MQ_hd_var_size (union_p2p_demand,
1604                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1605                            struct GNUNET_MessageHeader,
1606                            op),
1607     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1608                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1609                              struct GNUNET_MessageHeader,
1610                              op),
1611     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1612                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1613                              struct GNUNET_MessageHeader,
1614                              op),
1615     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1616                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1617                              struct GNUNET_MessageHeader,
1618                              op),
1619     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1620                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1621                            struct StrataEstimatorMessage,
1622                            op),
1623     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1624                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1625                            struct StrataEstimatorMessage,
1626                            op),
1627     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1628                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1629                            struct GNUNET_SET_ElementMessage,
1630                            op),
1631     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1632                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1633                              struct IntersectionElementInfoMessage,
1634                              op),
1635     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1636                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1637                            struct BFMessage,
1638                            op),
1639     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1640                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1641                              struct IntersectionDoneMessage,
1642                              op),
1643     GNUNET_MQ_handler_end ()
1644   };
1645   struct Set *set;
1646   struct OperationSpecification *spec;
1647   const struct GNUNET_MessageHeader *context;
1648
1649   set = set_get (client);
1650   if (NULL == set)
1651   {
1652     GNUNET_break (0);
1653     GNUNET_free (op);
1654     GNUNET_SERVICE_client_drop (client);
1655     return;
1656   }
1657   spec = GNUNET_new (struct OperationSpecification);
1658   spec->operation = set->operation;
1659   spec->app_id = msg->app_id;
1660   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1661                                          UINT32_MAX);
1662   spec->peer = msg->target_peer;
1663   spec->set = set;
1664   spec->result_mode = ntohl (msg->result_mode);
1665   spec->client_request_id = ntohl (msg->request_id);
1666   spec->byzantine = msg->byzantine;
1667   spec->byzantine_lower_bound = msg->byzantine_lower_bound;
1668   spec->force_full = msg->force_full;
1669   spec->force_delta = msg->force_delta;
1670   context = GNUNET_MQ_extract_nested_mh (msg);
1671   op->spec = spec;
1672
1673   // Advance generation values, so that
1674   // mutations won't interfer with the running operation.
1675   op->generation_created = set->current_generation;
1676   advance_generation (set);
1677   op->operation = set->operation;
1678   op->vt = set->vt;
1679   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1680                                set->ops_tail,
1681                                op);
1682   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683               "Creating new CADET channel to port %s\n",
1684               GNUNET_h2s (&msg->app_id));
1685   op->channel = GNUNET_CADET_channel_creatE (cadet,
1686                                              op,
1687                                              &msg->target_peer,
1688                                              &msg->app_id,
1689                                              GNUNET_CADET_OPTION_RELIABLE,
1690                                              &channel_window_cb,
1691                                              &channel_end_cb,
1692                                              cadet_handlers);
1693   op->mq = GNUNET_CADET_get_mq (op->channel);
1694   set->vt->evaluate (op,
1695                      context);
1696   GNUNET_SERVICE_client_continue (client);
1697 }
1698
1699
1700 /**
1701  * Handle an ack from a client, and send the next element. Note
1702  * that we only expect acks for set elements, not after the
1703  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1704  *
1705  * @param cls client the client
1706  * @param ack the message
1707  */
1708 static void
1709 handle_client_iter_ack (void *cls,
1710                         const struct GNUNET_SET_IterAckMessage *ack)
1711 {
1712   struct GNUNET_SERVICE_Client *client = cls;
1713   struct Set *set;
1714
1715   set = set_get (client);
1716   if (NULL == set)
1717   {
1718     /* client without a set acknowledged receiving a value */
1719     GNUNET_break (0);
1720     GNUNET_SERVICE_client_drop (client);
1721     return;
1722   }
1723   if (NULL == set->iter)
1724   {
1725     /* client sent an ack, but we were not expecting one (as
1726        set iteration has finished) */
1727     GNUNET_break (0);
1728     GNUNET_SERVICE_client_drop (client);
1729     return;
1730   }
1731   GNUNET_SERVICE_client_continue (client);
1732   if (ntohl (ack->send_more))
1733   {
1734     send_client_element (set);
1735   }
1736   else
1737   {
1738     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1739     set->iter = NULL;
1740     set->iteration_id++;
1741   }
1742 }
1743
1744
1745 /**
1746  * Handle a request from the client to copy a set.
1747  *
1748  * @param cls the client
1749  * @param mh the message
1750  */
1751 static void
1752 handle_client_copy_lazy_prepare (void *cls,
1753                                  const struct GNUNET_MessageHeader *mh)
1754 {
1755   struct GNUNET_SERVICE_Client *client = cls;
1756   struct Set *set;
1757   struct LazyCopyRequest *cr;
1758   struct GNUNET_MQ_Envelope *ev;
1759   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1760
1761   set = set_get (client);
1762   if (NULL == set)
1763   {
1764     /* client without a set requested an operation */
1765     GNUNET_break (0);
1766     GNUNET_SERVICE_client_drop (client);
1767     return;
1768   }
1769
1770   cr = GNUNET_new (struct LazyCopyRequest);
1771
1772   cr->cookie = lazy_copy_cookie;
1773   lazy_copy_cookie += 1;
1774   cr->source_set = set;
1775
1776   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1777                                lazy_copy_tail,
1778                                cr);
1779
1780
1781   ev = GNUNET_MQ_msg (resp_msg,
1782                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1783   resp_msg->cookie = cr->cookie;
1784   GNUNET_MQ_send (set->client_mq, ev);
1785
1786
1787   GNUNET_SERVICE_client_continue (client);
1788
1789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790               "Client requested lazy copy\n");
1791 }
1792
1793
1794 /**
1795  * Handle a request from the client to connect to a copy of a set.
1796  *
1797  * @param cls the client
1798  * @param msg the message
1799  */
1800 static void
1801 handle_client_copy_lazy_connect (void *cls,
1802                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1803 {
1804   struct GNUNET_SERVICE_Client *client = cls;
1805   struct LazyCopyRequest *cr;
1806   struct Set *set;
1807   int found;
1808
1809   if (NULL != set_get (client))
1810   {
1811     /* There can only be one set per client */
1812     GNUNET_break (0);
1813     GNUNET_SERVICE_client_drop (client);
1814     return;
1815   }
1816
1817   found = GNUNET_NO;
1818
1819   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1820   {
1821     if (cr->cookie == msg->cookie)
1822     {
1823       found = GNUNET_YES;
1824       break;
1825     }
1826   }
1827
1828   if (GNUNET_NO == found)
1829   {
1830     /* client asked for copy with cookie we don't know */
1831     GNUNET_break (0);
1832     GNUNET_SERVICE_client_drop (client);
1833     return;
1834   }
1835
1836   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1837                                lazy_copy_tail,
1838                                cr);
1839
1840   set = GNUNET_new (struct Set);
1841
1842   switch (cr->source_set->operation)
1843   {
1844   case GNUNET_SET_OPERATION_INTERSECTION:
1845     set->vt = _GSS_intersection_vt ();
1846     break;
1847   case GNUNET_SET_OPERATION_UNION:
1848     set->vt = _GSS_union_vt ();
1849     break;
1850   default:
1851     GNUNET_assert (0);
1852     return;
1853   }
1854
1855   if (NULL == set->vt->copy_state)
1856   {
1857     /* Lazy copy not supported for this set operation */
1858     GNUNET_break (0);
1859     GNUNET_free (set);
1860     GNUNET_free (cr);
1861     GNUNET_SERVICE_client_drop (client);
1862     return;
1863   }
1864
1865   set->operation = cr->source_set->operation;
1866   set->state = set->vt->copy_state (cr->source_set);
1867   set->content = cr->source_set->content;
1868   set->content->refcount += 1;
1869
1870   set->current_generation = cr->source_set->current_generation;
1871   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1872   set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations,
1873                                              set->excluded_generations_size * sizeof (struct GenerationRange));
1874
1875   /* Advance the generation of the new set, so that mutations to the
1876      of the cloned set and the source set are independent. */
1877   advance_generation (set);
1878
1879
1880   set->client = client;
1881   set->client_mq = GNUNET_SERVICE_client_get_mq (client);
1882   GNUNET_CONTAINER_DLL_insert (sets_head,
1883                                sets_tail,
1884                                set);
1885
1886   GNUNET_free (cr);
1887
1888   GNUNET_SERVICE_client_continue (client);
1889
1890   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1891               "Client connected to lazy set\n");
1892 }
1893
1894
1895 /**
1896  * Handle a request from the client to cancel a running set operation.
1897  *
1898  * @param cls the client
1899  * @param msg the message
1900  */
1901 static void
1902 handle_client_cancel (void *cls,
1903                       const struct GNUNET_SET_CancelMessage *msg)
1904 {
1905   struct GNUNET_SERVICE_Client *client = cls;
1906   struct Set *set;
1907   struct Operation *op;
1908   int found;
1909
1910   set = set_get (client);
1911   if (NULL == set)
1912   {
1913     /* client without a set requested an operation */
1914     GNUNET_break (0);
1915     GNUNET_SERVICE_client_drop (client);
1916     return;
1917   }
1918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919               "Client requested cancel for op %u\n",
1920               (uint32_t) ntohl (msg->request_id));
1921   found = GNUNET_NO;
1922   for (op = set->ops_head; NULL != op; op = op->next)
1923   {
1924     if (op->spec->client_request_id == ntohl (msg->request_id))
1925     {
1926       found = GNUNET_YES;
1927       break;
1928     }
1929   }
1930   if (GNUNET_NO == found)
1931   {
1932     /* It may happen that the operation was already destroyed due to
1933      * the other peer disconnecting.  The client may not know about this
1934      * yet and try to cancel the (just barely non-existent) operation.
1935      * So this is not a hard error.
1936      */
1937     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1938                 "Client canceled non-existent op\n");
1939   }
1940   else
1941   {
1942     _GSS_operation_destroy (op,
1943                             GNUNET_YES);
1944   }
1945   GNUNET_SERVICE_client_continue (client);
1946 }
1947
1948
1949 /**
1950  * Handle a request from the client to accept a set operation that
1951  * came from a remote peer.  We forward the accept to the associated
1952  * operation for handling
1953  *
1954  * @param cls the client
1955  * @param msg the message
1956  */
1957 static void
1958 handle_client_accept (void *cls,
1959                       const struct GNUNET_SET_AcceptMessage *msg)
1960 {
1961   struct GNUNET_SERVICE_Client *client = cls;
1962   struct Set *set;
1963   struct Operation *op;
1964   struct GNUNET_SET_ResultMessage *result_message;
1965   struct GNUNET_MQ_Envelope *ev;
1966
1967   set = set_get (client);
1968   if (NULL == set)
1969   {
1970     /* client without a set requested to accept */
1971     GNUNET_break (0);
1972     GNUNET_SERVICE_client_drop (client);
1973     return;
1974   }
1975   op = get_incoming (ntohl (msg->accept_reject_id));
1976   if (NULL == op)
1977   {
1978     /* It is not an error if the set op does not exist -- it may
1979      * have been destroyed when the partner peer disconnected. */
1980     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981                 "Client accepted request that is no longer active\n");
1982     ev = GNUNET_MQ_msg (result_message,
1983                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1984     result_message->request_id = msg->request_id;
1985     result_message->element_type = 0;
1986     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1987     GNUNET_MQ_send (set->client_mq, ev);
1988     GNUNET_SERVICE_client_continue (client);
1989     return;
1990   }
1991
1992   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993               "Client accepting request %u\n",
1994               (uint32_t) ntohl (msg->accept_reject_id));
1995   GNUNET_assert (GNUNET_YES == op->is_incoming);
1996   op->is_incoming = GNUNET_NO;
1997   GNUNET_CONTAINER_DLL_remove (incoming_head,
1998                                incoming_tail,
1999                                op);
2000   op->spec->set = set;
2001   GNUNET_CONTAINER_DLL_insert (set->ops_head,
2002                                set->ops_tail,
2003                                op);
2004   op->spec->client_request_id = ntohl (msg->request_id);
2005   op->spec->result_mode = ntohl (msg->result_mode);
2006   op->spec->byzantine = msg->byzantine;
2007   op->spec->byzantine_lower_bound = msg->byzantine_lower_bound;
2008   op->spec->force_full = msg->force_full;
2009   op->spec->force_delta = msg->force_delta;
2010
2011   // Advance generation values, so that
2012   // mutations won't interfer with the running operation.
2013   op->generation_created = set->current_generation;
2014   advance_generation (set);
2015
2016   op->vt = set->vt;
2017   op->operation = set->operation;
2018   op->vt->accept (op);
2019   GNUNET_SERVICE_client_continue (client);
2020 }
2021
2022
2023 /**
2024  * Called to clean up, after a shutdown has been requested.
2025  *
2026  * @param cls closure
2027  */
2028 static void
2029 shutdown_task (void *cls)
2030 {
2031   while (NULL != incoming_head)
2032     incoming_destroy (incoming_head);
2033   while (NULL != listeners_head)
2034     listener_destroy (listeners_head);
2035   while (NULL != sets_head)
2036     set_destroy (sets_head);
2037
2038   /* it's important to destroy cadet at the end, as all channels
2039    * must be destroyed before the cadet handle! */
2040   if (NULL != cadet)
2041   {
2042     GNUNET_CADET_disconnect (cadet);
2043     cadet = NULL;
2044   }
2045   GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
2046   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047               "handled shutdown request\n");
2048 }
2049
2050
2051 /**
2052  * Function called by the service's run
2053  * method to run service-specific setup code.
2054  *
2055  * @param cls closure
2056  * @param cfg configuration to use
2057  * @param service the initialized service
2058  */
2059 static void
2060 run (void *cls,
2061      const struct GNUNET_CONFIGURATION_Handle *cfg,
2062      struct GNUNET_SERVICE_Handle *service)
2063 {
2064   configuration = cfg;
2065   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2066                                  NULL);
2067   _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
2068   cadet = GNUNET_CADET_connecT (cfg);
2069   if (NULL == cadet)
2070   {
2071     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2072                 _("Could not connect to CADET service\n"));
2073     return;
2074   }
2075 }
2076
2077
2078 /**
2079  * Define "main" method using service macro.
2080  */
2081 GNUNET_SERVICE_MAIN
2082 ("set",
2083  GNUNET_SERVICE_OPTION_NONE,
2084  &run,
2085  &client_connect_cb,
2086  &client_disconnect_cb,
2087  NULL,
2088  GNUNET_MQ_hd_fixed_size (client_accept,
2089                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2090                           struct GNUNET_SET_AcceptMessage,
2091                           NULL),
2092  GNUNET_MQ_hd_fixed_size (client_iter_ack,
2093                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2094                           struct GNUNET_SET_IterAckMessage,
2095                           NULL),
2096  GNUNET_MQ_hd_var_size (client_mutation,
2097                         GNUNET_MESSAGE_TYPE_SET_ADD,
2098                         struct GNUNET_MessageHeader,
2099                         NULL),
2100  GNUNET_MQ_hd_fixed_size (client_create_set,
2101                           GNUNET_MESSAGE_TYPE_SET_CREATE,
2102                           struct GNUNET_SET_CreateMessage,
2103                           NULL),
2104  GNUNET_MQ_hd_fixed_size (client_iterate,
2105                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2106                           struct GNUNET_MessageHeader,
2107                           NULL),
2108  GNUNET_MQ_hd_var_size (client_evaluate,
2109                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2110                         struct GNUNET_SET_EvaluateMessage,
2111                         NULL),
2112  GNUNET_MQ_hd_fixed_size (client_listen,
2113                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
2114                           struct GNUNET_SET_ListenMessage,
2115                           NULL),
2116  GNUNET_MQ_hd_fixed_size (client_reject,
2117                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2118                           struct GNUNET_SET_RejectMessage,
2119                           NULL),
2120  GNUNET_MQ_hd_var_size (client_mutation,
2121                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2122                         struct GNUNET_MessageHeader,
2123                         NULL),
2124  GNUNET_MQ_hd_fixed_size (client_cancel,
2125                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2126                           struct GNUNET_SET_CancelMessage,
2127                           NULL),
2128  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2129                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2130                           struct GNUNET_MessageHeader,
2131                           NULL),
2132  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2133                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2134                           struct GNUNET_SET_CopyLazyConnectMessage,
2135                           NULL),
2136  GNUNET_MQ_handler_end ());
2137
2138
2139 /* end of gnunet-service-set.c */