fix state when requesting full IBF
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013, 2014, 2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
29
30 /**
31  * How long do we hold on to an incoming channel if there is
32  * no local listener before giving up?
33  */
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35
36 /**
37  * A listener is inhabited by a client, and waits for evaluation
38  * requests from remote peers.
39  */
40 struct Listener
41 {
42   /**
43    * Listeners are held in a doubly linked list.
44    */
45   struct Listener *next;
46
47   /**
48    * Listeners are held in a doubly linked list.
49    */
50   struct Listener *prev;
51
52   /**
53    * Client that owns the listener.
54    * Only one client may own a listener.
55    */
56   struct GNUNET_SERVICE_Client *client;
57
58   /**
59    * Message queue for the client
60    */
61   struct GNUNET_MQ_Handle *client_mq;
62
63   /**
64    * Application ID for the operation, used to distinguish
65    * multiple operations of the same type with the same peer.
66    */
67   struct GNUNET_HashCode app_id;
68
69   /**
70    * The port we are listening on with CADET.
71    */
72   struct GNUNET_CADET_Port *open_port;
73
74   /**
75    * The type of the operation.
76    */
77   enum GNUNET_SET_OperationType operation;
78 };
79
80
81 struct LazyCopyRequest
82 {
83   struct Set *source_set;
84   uint32_t cookie;
85
86   struct LazyCopyRequest *prev;
87   struct LazyCopyRequest *next;
88 };
89
90
91 /**
92  * Configuration of our local peer.
93  */
94 static const struct GNUNET_CONFIGURATION_Handle *configuration;
95
96 /**
97  * Handle to the cadet service, used to listen for and connect to
98  * remote peers.
99  */
100 static struct GNUNET_CADET_Handle *cadet;
101
102 /**
103  * Sets are held in a doubly linked list.
104  */
105 static struct Set *sets_head;
106
107 /**
108  * Sets are held in a doubly linked list.
109  */
110 static struct Set *sets_tail;
111
112 /**
113  * Listeners are held in a doubly linked list.
114  */
115 static struct Listener *listeners_head;
116
117 /**
118  * Listeners are held in a doubly linked list.
119  */
120 static struct Listener *listeners_tail;
121
122 /**
123  * Incoming sockets from remote peers are held in a doubly linked
124  * list.
125  */
126 static struct Operation *incoming_head;
127
128 /**
129  * Incoming sockets from remote peers are held in a doubly linked
130  * list.
131  */
132 static struct Operation *incoming_tail;
133
134 static struct LazyCopyRequest *lazy_copy_head;
135 static struct LazyCopyRequest *lazy_copy_tail;
136
137 static uint32_t lazy_copy_cookie = 1;
138
139 /**
140  * Counter for allocating unique IDs for clients, used to identify
141  * incoming operation requests from remote peers, that the client can
142  * choose to accept or refuse.
143  */
144 static uint32_t suggest_id = 1;
145
146 /**
147  * Statistics handle.
148  */
149 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
150
151
152 /**
153  * Get set that is owned by the given client, if any.
154  *
155  * @param client client to look for
156  * @return set that the client owns, NULL if the client
157  *         does not own a set
158  */
159 static struct Set *
160 set_get (struct GNUNET_SERVICE_Client *client)
161 {
162   struct Set *set;
163
164   for (set = sets_head; NULL != set; set = set->next)
165     if (set->client == client)
166       return set;
167   return NULL;
168 }
169
170
171 /**
172  * Get the listener associated with the given client, if any.
173  *
174  * @param client the client
175  * @return listener associated with the client, NULL
176  *         if there isn't any
177  */
178 static struct Listener *
179 listener_get (struct GNUNET_SERVICE_Client *client)
180 {
181   struct Listener *listener;
182
183   for (listener = listeners_head; NULL != listener; listener = listener->next)
184     if (listener->client == client)
185       return listener;
186   return NULL;
187 }
188
189
190 /**
191  * Get the incoming socket associated with the given id.
192  *
193  * @param id id to look for
194  * @return the incoming socket associated with the id,
195  *         or NULL if there is none
196  */
197 static struct Operation *
198 get_incoming (uint32_t id)
199 {
200   struct Operation *op;
201
202   for (op = incoming_head; NULL != op; op = op->next)
203     if (op->suggest_id == id)
204     {
205       GNUNET_assert (GNUNET_YES == op->is_incoming);
206       return op;
207     }
208   return NULL;
209 }
210
211
212 /**
213  * Destroy a listener, free all resources associated with it.
214  *
215  * @param listener listener to destroy
216  */
217 static void
218 listener_destroy (struct Listener *listener)
219 {
220   /* If the client is not dead yet, destroy it.
221    * The client's destroy callback will destroy the listener again. */
222   if (NULL != listener->client)
223   {
224     struct GNUNET_SERVICE_Client *client = listener->client;
225
226     listener->client = NULL;
227     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228                 "Disconnecting listener client\n");
229     GNUNET_SERVICE_client_drop (client);
230     return;
231   }
232   GNUNET_CADET_close_port (listener->open_port);
233   GNUNET_CONTAINER_DLL_remove (listeners_head,
234                                listeners_tail,
235                                listener);
236   GNUNET_free (listener);
237 }
238
239
240 /**
241  * Context for the #garbage_collect_cb().
242  */
243 struct GarbageContext
244 {
245
246   /**
247    * Map for which we are garbage collecting removed elements.
248    */
249   struct GNUNET_CONTAINER_MultiHashMap *map;
250
251   /**
252    * Lowest generation for which an operation is still pending.
253    */
254   unsigned int min_op_generation;
255
256   /**
257    * Largest generation for which an operation is still pending.
258    */
259   unsigned int max_op_generation;
260
261 };
262
263
264 /**
265  * Function invoked to check if an element can be removed from
266  * the set's history because it is no longer needed.
267  *
268  * @param cls the `struct GarbageContext *`
269  * @param key key of the element in the map
270  * @param value the `struct ElementEntry *`
271  * @return #GNUNET_OK (continue to iterate)
272  */
273 static int
274 garbage_collect_cb (void *cls,
275                     const struct GNUNET_HashCode *key,
276                     void *value)
277 {
278   //struct GarbageContext *gc = cls;
279   //struct ElementEntry *ee = value;
280
281   //if (GNUNET_YES != ee->removed)
282   //  return GNUNET_OK;
283   //if ( (gc->max_op_generation < ee->generation_added) ||
284   //     (ee->generation_removed > gc->min_op_generation) )
285   //{
286   //  GNUNET_assert (GNUNET_YES ==
287   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
288   //                                                       key,
289   //                                                       ee));
290   //  GNUNET_free (ee);
291   //}
292   return GNUNET_OK;
293 }
294
295
296 /**
297  * Collect and destroy elements that are not needed anymore, because
298  * their lifetime (as determined by their generation) does not overlap
299  * with any active set operation.
300  *
301  * @param set set to garbage collect
302  */
303 static void
304 collect_generation_garbage (struct Set *set)
305 {
306   struct Operation *op;
307   struct GarbageContext gc;
308
309   gc.min_op_generation = UINT_MAX;
310   gc.max_op_generation = 0;
311   for (op = set->ops_head; NULL != op; op = op->next)
312   {
313     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
314                                        op->generation_created);
315     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
316                                        op->generation_created);
317   }
318   gc.map = set->content->elements;
319   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
320                                          &garbage_collect_cb,
321                                          &gc);
322 }
323
324
325 static int
326 is_excluded_generation (unsigned int generation,
327                         struct GenerationRange *excluded,
328                         unsigned int excluded_size)
329 {
330   unsigned int i;
331
332   for (i = 0; i < excluded_size; i++)
333   {
334     if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
335       return GNUNET_YES;
336   }
337
338   return GNUNET_NO;
339 }
340
341
342 static int
343 is_element_of_generation (struct ElementEntry *ee,
344                           unsigned int query_generation,
345                           struct GenerationRange *excluded,
346                           unsigned int excluded_size)
347 {
348   struct MutationEvent *mut;
349   int is_present;
350   unsigned int i;
351
352   GNUNET_assert (NULL != ee->mutations);
353
354   if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size))
355   {
356     GNUNET_break (0);
357     return GNUNET_NO;
358   }
359
360   is_present = GNUNET_NO;
361
362   /* Could be made faster with binary search, but lists
363      are small, so why bother. */
364   for (i = 0; i < ee->mutations_size; i++)
365   {
366     mut = &ee->mutations[i];
367
368     if (mut->generation > query_generation)
369     {
370       /* The mutation doesn't apply to our generation
371          anymore.  We can'b break here, since mutations aren't
372          sorted by generation. */
373       continue;
374     }
375
376     if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size))
377     {
378       /* The generation is excluded (because it belongs to another
379          fork via a lazy copy) and thus mutations aren't considered
380          for membership testing. */
381       continue;
382     }
383
384     /* This would be an inconsistency in how we manage mutations. */
385     if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) )
386       GNUNET_assert (0);
387
388     /* Likewise. */
389     if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) )
390       GNUNET_assert (0);
391
392     is_present = mut->added;
393   }
394
395   return is_present;
396 }
397
398
399 int
400 _GSS_is_element_of_set (struct ElementEntry *ee,
401                         struct Set *set)
402 {
403   return is_element_of_generation (ee,
404                                    set->current_generation,
405                                    set->excluded_generations,
406                                    set->excluded_generations_size);
407 }
408
409
410 static int
411 is_element_of_iteration (struct ElementEntry *ee,
412                          struct Set *set)
413 {
414   return is_element_of_generation (ee,
415                                    set->iter_generation,
416                                    set->excluded_generations,
417                                    set->excluded_generations_size);
418 }
419
420
421 int
422 _GSS_is_element_of_operation (struct ElementEntry *ee,
423                               struct Operation *op)
424 {
425   return is_element_of_generation (ee,
426                                    op->generation_created,
427                                    op->spec->set->excluded_generations,
428                                    op->spec->set->excluded_generations_size);
429 }
430
431
432 /**
433  * Destroy the given operation.  Call the implementation-specific
434  * cancel function of the operation.  Disconnects from the remote
435  * peer.  Does not disconnect the client, as there may be multiple
436  * operations per set.
437  *
438  * @param op operation to destroy
439  * @param gc #GNUNET_YES to perform garbage collection on the set
440  */
441 void
442 _GSS_operation_destroy (struct Operation *op,
443                         int gc)
444 {
445   struct Set *set;
446   struct GNUNET_CADET_Channel *channel;
447
448   if (NULL == op->vt)
449   {
450     /* already in #_GSS_operation_destroy() */
451     return;
452   }
453   GNUNET_assert (GNUNET_NO == op->is_incoming);
454   GNUNET_assert (NULL != op->spec);
455   set = op->spec->set;
456   GNUNET_CONTAINER_DLL_remove (set->ops_head,
457                                set->ops_tail,
458                                op);
459   op->vt->cancel (op);
460   op->vt = NULL;
461   if (NULL != op->spec)
462   {
463     if (NULL != op->spec->context_msg)
464     {
465       GNUNET_free (op->spec->context_msg);
466       op->spec->context_msg = NULL;
467     }
468     GNUNET_free (op->spec);
469     op->spec = NULL;
470   }
471   if (NULL != (channel = op->channel))
472   {
473     op->channel = NULL;
474     GNUNET_CADET_channel_destroy (channel);
475   }
476   if (GNUNET_YES == gc)
477     collect_generation_garbage (set);
478   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
479    * there was a channel end handler that will free 'op' on the call stack. */
480 }
481
482
483 /**
484  * Iterator over hash map entries to free element entries.
485  *
486  * @param cls closure
487  * @param key current key code
488  * @param value a `struct ElementEntry *` to be free'd
489  * @return #GNUNET_YES (continue to iterate)
490  */
491 static int
492 destroy_elements_iterator (void *cls,
493                            const struct GNUNET_HashCode *key,
494                            void *value)
495 {
496   struct ElementEntry *ee = value;
497
498   GNUNET_free_non_null (ee->mutations);
499
500   GNUNET_free (ee);
501   return GNUNET_YES;
502 }
503
504
505 /**
506  * Destroy a set, and free all resources and operations associated with it.
507  *
508  * @param set the set to destroy
509  */
510 static void
511 set_destroy (struct Set *set)
512 {
513   if (NULL != set->client)
514   {
515     /* If the client is not dead yet, destroy it.  The client's destroy
516      * callback will call `set_destroy()` again in this case.  We do
517      * this so that the channel end handler still has a valid set handle
518      * to destroy. */
519     struct GNUNET_SERVICE_Client *client = set->client;
520
521     set->client = NULL;
522     GNUNET_SERVICE_client_drop (client);
523     return;
524   }
525   GNUNET_assert (NULL != set->state);
526   while (NULL != set->ops_head)
527     _GSS_operation_destroy (set->ops_head, GNUNET_NO);
528   set->vt->destroy_set (set->state);
529   set->state = NULL;
530   if (NULL != set->iter)
531   {
532     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
533     set->iter = NULL;
534     set->iteration_id++;
535   }
536   {
537     struct SetContent *content;
538     struct PendingMutation *pm;
539     struct PendingMutation *pm_current;
540
541     content = set->content;
542
543     // discard any pending mutations that reference this set
544     pm = content->pending_mutations_head;
545     while (NULL != pm)
546     {
547       pm_current = pm;
548       pm = pm->next;
549       if (pm_current-> set == set)
550         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
551                                      content->pending_mutations_tail,
552                                      pm_current);
553
554     }
555
556     set->content = NULL;
557     GNUNET_assert (0 != content->refcount);
558     content->refcount -= 1;
559     if (0 == content->refcount)
560     {
561       GNUNET_assert (NULL != content->elements);
562       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
563                                              &destroy_elements_iterator,
564                                              NULL);
565       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
566       content->elements = NULL;
567       GNUNET_free (content);
568     }
569   }
570   GNUNET_free_non_null (set->excluded_generations);
571   set->excluded_generations = NULL;
572   GNUNET_CONTAINER_DLL_remove (sets_head,
573                                sets_tail,
574                                set);
575
576   // remove set from pending copy requests
577   {
578     struct LazyCopyRequest *lcr;
579     lcr = lazy_copy_head;
580     while (NULL != lcr)
581     {
582       struct LazyCopyRequest *lcr_current;
583       lcr_current = lcr;
584       lcr = lcr->next;
585       if (lcr_current->source_set == set)
586         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
587                                      lazy_copy_tail,
588                                      lcr_current);
589     }
590   }
591
592   GNUNET_free (set);
593 }
594
595
596 /**
597  * Callback called when a client connects to the service.
598  *
599  * @param cls closure for the service
600  * @param c the new client that connected to the service
601  * @param mq the message queue used to send messages to the client
602  * @return @a c
603  */
604 static void *
605 client_connect_cb (void *cls,
606                    struct GNUNET_SERVICE_Client *c,
607                    struct GNUNET_MQ_Handle *mq)
608 {
609   return c;
610 }
611
612
613 /**
614  * Clean up after a client has disconnected
615  *
616  * @param cls closure, unused
617  * @param client the client to clean up after
618  * @param internal_cls our client-specific internal data structure
619  */
620 static void
621 client_disconnect_cb (void *cls,
622                       struct GNUNET_SERVICE_Client *client,
623                       void *internal_cls)
624 {
625   struct Listener *listener;
626   struct Set *set;
627
628   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629               "client disconnected, cleaning up\n");
630   set = set_get (client);
631   if (NULL != set)
632   {
633     set->client = NULL;
634     set_destroy (set);
635     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636                 "Client's set destroyed\n");
637   }
638   listener = listener_get (client);
639   if (NULL != listener)
640   {
641     listener->client = NULL;
642     listener_destroy (listener);
643     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644                 "Client's listener destroyed\n");
645   }
646 }
647
648
649 /**
650  * Destroy an incoming request from a remote peer
651  *
652  * @param incoming remote request to destroy
653  */
654 static void
655 incoming_destroy (struct Operation *incoming)
656 {
657   struct GNUNET_CADET_Channel *channel;
658
659   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
660   GNUNET_CONTAINER_DLL_remove (incoming_head,
661                                incoming_tail,
662                                incoming);
663   if (NULL != incoming->timeout_task)
664   {
665     GNUNET_SCHEDULER_cancel (incoming->timeout_task);
666     incoming->timeout_task = NULL;
667   }
668   /* make sure that the tunnel end handler will not destroy us again */
669   incoming->vt = NULL;
670   if (NULL != incoming->spec)
671   {
672     GNUNET_free (incoming->spec);
673     incoming->spec = NULL;
674   }
675   if (NULL != (channel = incoming->channel))
676   {
677     incoming->channel = NULL;
678     GNUNET_CADET_channel_destroy (channel);
679   }
680 }
681
682
683 /**
684  * Suggest the given request to the listener. The listening client can
685  * then accept or reject the remote request.
686  *
687  * @param incoming the incoming peer with the request to suggest
688  * @param listener the listener to suggest the request to
689  */
690 static void
691 incoming_suggest (struct Operation *incoming,
692                   struct Listener *listener)
693 {
694   struct GNUNET_MQ_Envelope *mqm;
695   struct GNUNET_SET_RequestMessage *cmsg;
696
697   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
698   GNUNET_assert (NULL != incoming->spec);
699   GNUNET_assert (0 == incoming->suggest_id);
700   incoming->suggest_id = suggest_id++;
701   if (0 == suggest_id)
702     suggest_id++;
703   GNUNET_assert (NULL != incoming->timeout_task);
704   GNUNET_SCHEDULER_cancel (incoming->timeout_task);
705   incoming->timeout_task = NULL;
706   mqm = GNUNET_MQ_msg_nested_mh (cmsg,
707                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
708                                  incoming->spec->context_msg);
709   GNUNET_assert (NULL != mqm);
710   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
711               "Suggesting incoming request with accept id %u to listener\n",
712               incoming->suggest_id);
713   cmsg->accept_id = htonl (incoming->suggest_id);
714   cmsg->peer_id = incoming->spec->peer;
715   GNUNET_MQ_send (listener->client_mq,
716                   mqm);
717 }
718
719
720 /**
721  * Handle a request for a set operation from another peer.  Checks if we
722  * have a listener waiting for such a request (and in that case initiates
723  * asking the listener about accepting the connection). If no listener
724  * is waiting, we queue the operation request in hope that a listener
725  * shows up soon (before timeout).
726  *
727  * This msg is expected as the first and only msg handled through the
728  * non-operation bound virtual table, acceptance of this operation replaces
729  * our virtual table and subsequent msgs would be routed differently (as
730  * we then know what type of operation this is).
731  *
732  * @param op the operation state
733  * @param mh the received message
734  * @return #GNUNET_OK if the channel should be kept alive,
735  *         #GNUNET_SYSERR to destroy the channel
736  */
737 static int
738 handle_incoming_msg (struct Operation *op,
739                      const struct GNUNET_MessageHeader *mh)
740 {
741   const struct OperationRequestMessage *msg;
742   struct Listener *listener = op->listener;
743   struct OperationSpecification *spec;
744   const struct GNUNET_MessageHeader *nested_context;
745
746   msg = (const struct OperationRequestMessage *) mh;
747   GNUNET_assert (GNUNET_YES == op->is_incoming);
748   if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
749   {
750     GNUNET_break_op (0);
751     return GNUNET_SYSERR;
752   }
753   /* double operation request */
754   if (NULL != op->spec)
755   {
756     GNUNET_break_op (0);
757     return GNUNET_SYSERR;
758   }
759   spec = GNUNET_new (struct OperationSpecification);
760   nested_context = GNUNET_MQ_extract_nested_mh (msg);
761   if ( (NULL != nested_context) &&
762        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
763   {
764     GNUNET_break_op (0);
765     GNUNET_free (spec);
766     return GNUNET_SYSERR;
767   }
768   /* Make a copy of the nested_context (application-specific context
769      information that is opaque to set) so we can pass it to the
770      listener later on */
771   if (NULL != nested_context)
772     spec->context_msg = GNUNET_copy_message (nested_context);
773   spec->operation = ntohl (msg->operation);
774   spec->app_id = listener->app_id;
775   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
776                                          UINT32_MAX);
777   spec->peer = op->peer;
778   spec->remote_element_count = ntohl (msg->element_count);
779   op->spec = spec;
780
781   listener = op->listener;
782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783               "Received P2P operation request (op %u, port %s) for active listener\n",
784               (uint32_t) ntohl (msg->operation),
785               GNUNET_h2s (&listener->app_id));
786   incoming_suggest (op,
787                     listener);
788   return GNUNET_OK;
789 }
790
791
792 static void
793 execute_add (struct Set *set,
794              const struct GNUNET_MessageHeader *m)
795 {
796   const struct GNUNET_SET_ElementMessage *msg;
797   struct GNUNET_SET_Element el;
798   struct ElementEntry *ee;
799   struct GNUNET_HashCode hash;
800
801   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type));
802
803   msg = (const struct GNUNET_SET_ElementMessage *) m;
804   el.size = ntohs (m->size) - sizeof *msg;
805   el.data = &msg[1];
806   el.element_type = ntohs (msg->element_type);
807   GNUNET_SET_element_hash (&el, &hash);
808
809   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
810                                           &hash);
811
812   if (NULL == ee)
813   {
814     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815                 "Client inserts element %s of size %u\n",
816                 GNUNET_h2s (&hash),
817                 el.size);
818     ee = GNUNET_malloc (el.size + sizeof *ee);
819     ee->element.size = el.size;
820     GNUNET_memcpy (&ee[1],
821             el.data,
822             el.size);
823     ee->element.data = &ee[1];
824     ee->element.element_type = el.element_type;
825     ee->remote = GNUNET_NO;
826     ee->mutations = NULL;
827     ee->mutations_size = 0;
828     ee->element_hash = hash;
829     GNUNET_break (GNUNET_YES ==
830                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
831                                                      &ee->element_hash,
832                                                      ee,
833                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
834   }
835   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
836   {
837     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838                 "Client inserted element %s of size %u twice (ignored)\n",
839                 GNUNET_h2s (&hash),
840                 el.size);
841
842     /* same element inserted twice */
843     return;
844   }
845
846   {
847     struct MutationEvent mut = {
848       .generation = set->current_generation,
849       .added = GNUNET_YES
850     };
851     GNUNET_array_append (ee->mutations,
852                          ee->mutations_size,
853                          mut);
854   }
855
856   set->vt->add (set->state, ee);
857 }
858
859
860 static void
861 execute_remove (struct Set *set,
862                 const struct GNUNET_MessageHeader *m)
863 {
864   const struct GNUNET_SET_ElementMessage *msg;
865   struct GNUNET_SET_Element el;
866   struct ElementEntry *ee;
867   struct GNUNET_HashCode hash;
868
869   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type));
870
871   msg = (const struct GNUNET_SET_ElementMessage *) m;
872   el.size = ntohs (m->size) - sizeof *msg;
873   el.data = &msg[1];
874   el.element_type = ntohs (msg->element_type);
875   GNUNET_SET_element_hash (&el, &hash);
876   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
877                                           &hash);
878   if (NULL == ee)
879   {
880     /* Client tried to remove non-existing element. */
881     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882                 "Client removes non-existing element of size %u\n",
883                 el.size);
884     return;
885   }
886   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
887   {
888     /* Client tried to remove element twice */
889     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                 "Client removed element of size %u twice (ignored)\n",
891                 el.size);
892     return;
893   }
894   else
895   {
896     struct MutationEvent mut = {
897       .generation = set->current_generation,
898       .added = GNUNET_NO
899     };
900
901     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
902                 "Client removes element of size %u\n",
903                 el.size);
904
905     GNUNET_array_append (ee->mutations,
906                          ee->mutations_size,
907                          mut);
908   }
909   set->vt->remove (set->state, ee);
910 }
911
912
913
914 static void
915 execute_mutation (struct Set *set,
916                   const struct GNUNET_MessageHeader *m)
917 {
918   switch (ntohs (m->type))
919   {
920     case GNUNET_MESSAGE_TYPE_SET_ADD:
921       execute_add (set, m);
922       break;
923     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
924       execute_remove (set, m);
925       break;
926     default:
927       GNUNET_break (0);
928   }
929 }
930
931
932
933 /**
934  * Send the next element of a set to the set's client.  The next element is given by
935  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
936  * are no more elements in the set.  The caller must ensure that the set's iterator is
937  * valid.
938  *
939  * The client will acknowledge each received element with a
940  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
941  * #handle_client_iter_ack() will then trigger the next transmission.
942  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
943  *
944  * @param set set that should send its next element to its client
945  */
946 static void
947 send_client_element (struct Set *set)
948 {
949   int ret;
950   struct ElementEntry *ee;
951   struct GNUNET_MQ_Envelope *ev;
952   struct GNUNET_SET_IterResponseMessage *msg;
953
954   GNUNET_assert (NULL != set->iter);
955
956 again:
957
958   ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
959                                                      NULL,
960                                                      (const void **) &ee);
961   if (GNUNET_NO == ret)
962   {
963     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964                 "Iteration on %p done.\n",
965                 (void *) set);
966     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
967     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
968     set->iter = NULL;
969     set->iteration_id++;
970
971     GNUNET_assert (set->content->iterator_count > 0);
972     set->content->iterator_count -= 1;
973
974     if (0 == set->content->iterator_count)
975     {
976       while (NULL != set->content->pending_mutations_head)
977       {
978         struct PendingMutation *pm;
979
980         pm = set->content->pending_mutations_head;
981         GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
982                                      set->content->pending_mutations_tail,
983                                      pm);
984         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                     "Executing pending mutation on %p.\n",
986                     (void *) pm->set);
987         execute_mutation (pm->set, pm->mutation_message);
988         GNUNET_free (pm->mutation_message);
989         GNUNET_free (pm);
990       }
991     }
992
993   }
994   else
995   {
996     GNUNET_assert (NULL != ee);
997
998     if (GNUNET_NO == is_element_of_iteration (ee, set))
999       goto again;
1000
1001     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002                 "Sending iteration element on %p.\n",
1003                 (void *) set);
1004     ev = GNUNET_MQ_msg_extra (msg,
1005                               ee->element.size,
1006                               GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1007     GNUNET_memcpy (&msg[1],
1008             ee->element.data,
1009             ee->element.size);
1010     msg->element_type = htons (ee->element.element_type);
1011     msg->iteration_id = htons (set->iteration_id);
1012   }
1013   GNUNET_MQ_send (set->client_mq, ev);
1014 }
1015
1016
1017 /**
1018  * Called when a client wants to iterate the elements of a set.
1019  * Checks if we have a set associated with the client and if we
1020  * can right now start an iteration. If all checks out, starts
1021  * sending the elements of the set to the client.
1022  *
1023  * @param cls client that sent the message
1024  * @param m message sent by the client
1025  */
1026 static void
1027 handle_client_iterate (void *cls,
1028                        const struct GNUNET_MessageHeader *m)
1029 {
1030   struct GNUNET_SERVICE_Client *client = cls;
1031   struct Set *set;
1032
1033   set = set_get (client);
1034   if (NULL == set)
1035   {
1036     /* attempt to iterate over a non existing set */
1037     GNUNET_break (0);
1038     GNUNET_SERVICE_client_drop (client);
1039     return;
1040   }
1041   if (NULL != set->iter)
1042   {
1043     /* Only one concurrent iterate-action allowed per set */
1044     GNUNET_break (0);
1045     GNUNET_SERVICE_client_drop (client);
1046     return;
1047   }
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049               "Iterating set %p in gen %u with %u content elements\n",
1050               (void *) set,
1051               set->current_generation,
1052               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1053   GNUNET_SERVICE_client_continue (client);
1054   set->content->iterator_count += 1;
1055   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1056   set->iter_generation = set->current_generation;
1057   send_client_element (set);
1058 }
1059
1060
1061 /**
1062  * Called when a client wants to create a new set.  This is typically
1063  * the first request from a client, and includes the type of set
1064  * operation to be performed.
1065  *
1066  * @param cls client that sent the message
1067  * @param m message sent by the client
1068  */
1069 static void
1070 handle_client_create_set (void *cls,
1071                           const struct GNUNET_SET_CreateMessage *msg)
1072 {
1073   struct GNUNET_SERVICE_Client *client = cls;
1074   struct Set *set;
1075
1076   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077               "Client created new set (operation %u)\n",
1078               (uint32_t) ntohl (msg->operation));
1079   if (NULL != set_get (client))
1080   {
1081     /* There can only be one set per client */
1082     GNUNET_break (0);
1083     GNUNET_SERVICE_client_drop (client);
1084     return;
1085   }
1086   set = GNUNET_new (struct Set);
1087   switch (ntohl (msg->operation))
1088   {
1089   case GNUNET_SET_OPERATION_INTERSECTION:
1090     set->vt = _GSS_intersection_vt ();
1091     break;
1092   case GNUNET_SET_OPERATION_UNION:
1093     set->vt = _GSS_union_vt ();
1094     break;
1095   default:
1096     GNUNET_free (set);
1097     GNUNET_break (0);
1098     GNUNET_SERVICE_client_drop (client);
1099     return;
1100   }
1101   set->operation = ntohl (msg->operation);
1102   set->state = set->vt->create ();
1103   if (NULL == set->state)
1104   {
1105     /* initialization failed (i.e. out of memory) */
1106     GNUNET_free (set);
1107     GNUNET_SERVICE_client_drop (client);
1108     return;
1109   }
1110   set->content = GNUNET_new (struct SetContent);
1111   set->content->refcount = 1;
1112   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1113   set->client = client;
1114   set->client_mq = GNUNET_SERVICE_client_get_mq (client);
1115   GNUNET_CONTAINER_DLL_insert (sets_head,
1116                                sets_tail,
1117                                set);
1118   GNUNET_SERVICE_client_continue (client);
1119 }
1120
1121
1122 /**
1123  * Timeout happens iff:
1124  *  - we suggested an operation to our listener,
1125  *    but did not receive a response in time
1126  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1127  *
1128  * @param cls channel context
1129  * @param tc context information (why was this task triggered now)
1130  */
1131 static void
1132 incoming_timeout_cb (void *cls)
1133 {
1134   struct Operation *incoming = cls;
1135
1136   incoming->timeout_task = NULL;
1137   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139               "Remote peer's incoming request timed out\n");
1140   incoming_destroy (incoming);
1141 }
1142
1143
1144 /**
1145  * Terminates an incoming operation in case we have not yet received an
1146  * operation request. Called by the channel destruction handler.
1147  *
1148  * @param op the channel context
1149  */
1150 static void
1151 handle_incoming_disconnect (struct Operation *op)
1152 {
1153   GNUNET_assert (GNUNET_YES == op->is_incoming);
1154   /* channel is already dead, incoming_destroy must not
1155    * destroy it ... */
1156   op->channel = NULL;
1157   incoming_destroy (op);
1158   op->vt = NULL;
1159 }
1160
1161
1162 /**
1163  * Method called whenever another peer has added us to a channel the
1164  * other peer initiated.  Only called (once) upon reception of data
1165  * from a channel we listen on.
1166  *
1167  * The channel context represents the operation itself and gets added
1168  * to a DLL, from where it gets looked up when our local listener
1169  * client responds to a proposed/suggested operation or connects and
1170  * associates with this operation.
1171  *
1172  * @param cls closure
1173  * @param channel new handle to the channel
1174  * @param source peer that started the channel
1175  * @return initial channel context for the channel
1176  *         returns NULL on error
1177  */
1178 static void *
1179 channel_new_cb (void *cls,
1180                 struct GNUNET_CADET_Channel *channel,
1181                 const struct GNUNET_PeerIdentity *source)
1182 {
1183   static const struct SetVT incoming_vt = {
1184     .msg_handler = &handle_incoming_msg,
1185     .peer_disconnect = &handle_incoming_disconnect
1186   };
1187   struct Listener *listener = cls;
1188   struct Operation *incoming;
1189
1190   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191               "New incoming channel\n");
1192   incoming = GNUNET_new (struct Operation);
1193   incoming->listener = listener;
1194   incoming->is_incoming = GNUNET_YES;
1195   incoming->peer = *source;
1196   incoming->channel = channel;
1197   incoming->mq = GNUNET_CADET_get_mq (incoming->channel);
1198   incoming->vt = &incoming_vt;
1199   incoming->timeout_task
1200     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1201                                     &incoming_timeout_cb,
1202                                     incoming);
1203   GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
1204                                     incoming_tail,
1205                                     incoming);
1206   // incoming_suggest (incoming,
1207   //                  listener);
1208   return incoming;
1209 }
1210
1211
1212 /**
1213  * Function called whenever a channel is destroyed.  Should clean up
1214  * any associated state.  It must NOT call
1215  * GNUNET_CADET_channel_destroy() on the channel.
1216  *
1217  * The peer_disconnect function is part of a a virtual table set initially either
1218  * when a peer creates a new channel with us, or once we create
1219  * a new channel ourselves (evaluate).
1220  *
1221  * Once we know the exact type of operation (union/intersection), the vt is
1222  * replaced with an operation specific instance (_GSS_[op]_vt).
1223  *
1224  * @param channel_ctx place where local state associated
1225  *                   with the channel is stored
1226  * @param channel connection to the other end (henceforth invalid)
1227  */
1228 static void
1229 channel_end_cb (void *channel_ctx,
1230                 const struct GNUNET_CADET_Channel *channel)
1231 {
1232   struct Operation *op = channel_ctx;
1233
1234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235               "channel_end_cb called\n");
1236   op->channel = NULL;
1237   op->keep++;
1238   /* the vt can be null if a client already requested canceling op. */
1239   if (NULL != op->vt)
1240   {
1241     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1242                 "calling peer disconnect due to channel end\n");
1243     op->vt->peer_disconnect (op);
1244   }
1245   op->keep--;
1246   if (0 == op->keep)
1247   {
1248     /* cadet will never call us with the context again! */
1249     GNUNET_free (op);
1250   }
1251   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1252               "channel_end_cb finished\n");
1253 }
1254
1255
1256 /**
1257  * Function called whenever an MQ-channel's transmission window size changes.
1258  *
1259  * The first callback in an outgoing channel will be with a non-zero value
1260  * and will mean the channel is connected to the destination.
1261  *
1262  * For an incoming channel it will be called immediately after the
1263  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1264  *
1265  * @param cls Channel closure.
1266  * @param channel Connection to the other end (henceforth invalid).
1267  * @param window_size New window size. If the is more messages than buffer size
1268  *                    this value will be negative..
1269  */
1270 static void
1271 channel_window_cb (void *cls,
1272                    const struct GNUNET_CADET_Channel *channel,
1273                    int window_size)
1274 {
1275   /* FIXME: not implemented, we could do flow control here... */
1276 }
1277
1278 /**
1279  * FIXME: hack-job. Migrate to proper handler array use!
1280  *
1281  * @param cls local state associated with the channel.
1282  * @param message The actual message.
1283  */
1284 static int
1285 check_p2p_message (void *cls,
1286                    const struct GNUNET_MessageHeader *message)
1287 {
1288   return GNUNET_OK;
1289 }
1290
1291
1292 /**
1293  * FIXME: hack-job. Migrate to proper handler array use!
1294  *
1295  * Functions with this signature are called whenever a message is
1296  * received via a cadet channel.
1297  *
1298  * The msg_handler is a virtual table set in initially either when a peer
1299  * creates a new channel with us, or once we create a new channel
1300  * ourselves (evaluate).
1301  *
1302  * Once we know the exact type of operation (union/intersection), the vt is
1303  * replaced with an operation specific instance (_GSS_[op]_vt).
1304  *
1305  * @param cls local state associated with the channel.
1306  * @param message The actual message.
1307  */
1308 static void
1309 handle_p2p_message (void *cls,
1310                     const struct GNUNET_MessageHeader *message)
1311 {
1312   struct Operation *op = cls;
1313   int ret;
1314
1315   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316               "Dispatching cadet message (type: %u)\n",
1317               ntohs (message->type));
1318   /* do this before the handler, as the handler might kill the channel */
1319   GNUNET_CADET_receive_done (op->channel);
1320   if (NULL != op->vt)
1321     ret = op->vt->msg_handler (op,
1322                                message);
1323   else
1324     ret = GNUNET_SYSERR;
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "Handled cadet message (type: %u)\n",
1327               ntohs (message->type));
1328   if (GNUNET_OK != ret)
1329     GNUNET_CADET_channel_destroy (op->channel);
1330 }
1331
1332
1333 /**
1334  * Called when a client wants to create a new listener.
1335  *
1336  * @param cls client that sent the message
1337  * @param msg message sent by the client
1338  */
1339 static void
1340 handle_client_listen (void *cls,
1341                       const struct GNUNET_SET_ListenMessage *msg)
1342 {
1343   struct GNUNET_SERVICE_Client *client = cls;
1344   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1345     GNUNET_MQ_hd_var_size (p2p_message,
1346                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1347                            struct GNUNET_MessageHeader,
1348                            NULL),
1349     GNUNET_MQ_hd_var_size (p2p_message,
1350                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1351                            struct GNUNET_MessageHeader,
1352                            NULL),
1353     GNUNET_MQ_hd_var_size (p2p_message,
1354                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1355                            struct GNUNET_MessageHeader,
1356                            NULL),
1357     GNUNET_MQ_hd_var_size (p2p_message,
1358                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1359                            struct GNUNET_MessageHeader,
1360                            NULL),
1361     GNUNET_MQ_hd_var_size (p2p_message,
1362                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1363                            struct GNUNET_MessageHeader,
1364                            NULL),
1365     GNUNET_MQ_hd_var_size (p2p_message,
1366                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1367                            struct GNUNET_MessageHeader,
1368                            NULL),
1369     GNUNET_MQ_hd_var_size (p2p_message,
1370                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1371                            struct GNUNET_MessageHeader,
1372                            NULL),
1373     GNUNET_MQ_hd_var_size (p2p_message,
1374                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1375                            struct GNUNET_MessageHeader,
1376                            NULL),
1377     GNUNET_MQ_hd_var_size (p2p_message,
1378                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1379                            struct GNUNET_MessageHeader,
1380                            NULL),
1381     GNUNET_MQ_hd_var_size (p2p_message,
1382                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1383                            struct GNUNET_MessageHeader,
1384                            NULL),
1385     GNUNET_MQ_hd_var_size (p2p_message,
1386                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1387                            struct GNUNET_MessageHeader,
1388                            NULL),
1389     GNUNET_MQ_hd_var_size (p2p_message,
1390                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1391                            struct GNUNET_MessageHeader,
1392                            NULL),
1393     GNUNET_MQ_hd_var_size (p2p_message,
1394                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1395                            struct GNUNET_MessageHeader,
1396                            NULL),
1397     GNUNET_MQ_hd_var_size (p2p_message,
1398                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1399                            struct GNUNET_MessageHeader,
1400                            NULL),
1401     GNUNET_MQ_hd_var_size (p2p_message,
1402                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1403                            struct GNUNET_MessageHeader,
1404                            NULL),
1405     GNUNET_MQ_handler_end ()
1406   };
1407   struct Listener *listener;
1408
1409   if (NULL != listener_get (client))
1410   {
1411     /* max. one active listener per client! */
1412     GNUNET_break (0);
1413     GNUNET_SERVICE_client_drop (client);
1414     return;
1415   }
1416   listener = GNUNET_new (struct Listener);
1417   listener->client = client;
1418   listener->client_mq = GNUNET_SERVICE_client_get_mq (client);
1419   listener->app_id = msg->app_id;
1420   listener->operation = ntohl (msg->operation);
1421   GNUNET_CONTAINER_DLL_insert_tail (listeners_head,
1422                                     listeners_tail,
1423                                     listener);
1424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425               "New listener created (op %u, port %s)\n",
1426               listener->operation,
1427               GNUNET_h2s (&listener->app_id));
1428   listener->open_port = GNUNET_CADET_open_porT (cadet,
1429                                                 &msg->app_id,
1430                                                 &channel_new_cb,
1431                                                 listener,
1432                                                 &channel_window_cb,
1433                                                 &channel_end_cb,
1434                                                 cadet_handlers);
1435   /* check for existing incoming requests the listener might be interested in */
1436   for (struct Operation *op = incoming_head; NULL != op; op = op->next)
1437   {
1438     if (NULL == op->spec)
1439       continue; /* no details available yet */
1440     if (0 != op->suggest_id)
1441       continue; /* this one has been already suggested to a listener */
1442     if (listener->operation != op->spec->operation)
1443       continue; /* incompatible operation */
1444     if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1445                                      &op->spec->app_id))
1446       continue; /* incompatible appliation */
1447     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1448                 "Found matching existing request\n");
1449     incoming_suggest (op,
1450                       listener);
1451   }
1452   GNUNET_SERVICE_client_continue (client);
1453 }
1454
1455
1456 /**
1457  * Called when the listening client rejects an operation
1458  * request by another peer.
1459  *
1460  * @param cls client that sent the message
1461  * @param msg message sent by the client
1462  */
1463 static void
1464 handle_client_reject (void *cls,
1465                       const struct GNUNET_SET_RejectMessage *msg)
1466 {
1467   struct GNUNET_SERVICE_Client *client = cls;
1468   struct Operation *incoming;
1469
1470   incoming = get_incoming (ntohl (msg->accept_reject_id));
1471   if (NULL == incoming)
1472   {
1473     /* no matching incoming operation for this reject */
1474     GNUNET_break (0);
1475     GNUNET_SERVICE_client_drop (client);
1476     return;
1477   }
1478   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1479               "Peer request (op %u, app %s) rejected by client\n",
1480               incoming->spec->operation,
1481               GNUNET_h2s (&incoming->spec->app_id));
1482   GNUNET_CADET_channel_destroy (incoming->channel);
1483   GNUNET_SERVICE_client_continue (client);
1484 }
1485
1486
1487 /**
1488  * Called when a client wants to add or remove an element to a set it inhabits.
1489  *
1490  * @param cls client that sent the message
1491  * @param m message sent by the client
1492  */
1493 static int
1494 check_client_mutation (void *cls,
1495                        const struct GNUNET_MessageHeader *m)
1496 {
1497   /* FIXME: any check we might want to do here? */
1498   return GNUNET_OK;
1499 }
1500
1501
1502 /**
1503  * Called when a client wants to add or remove an element to a set it inhabits.
1504  *
1505  * @param cls client that sent the message
1506  * @param m message sent by the client
1507  */
1508 static void
1509 handle_client_mutation (void *cls,
1510                         const struct GNUNET_MessageHeader *m)
1511 {
1512   struct GNUNET_SERVICE_Client *client = cls;
1513   struct Set *set;
1514
1515   set = set_get (client);
1516   if (NULL == set)
1517   {
1518     /* client without a set requested an operation */
1519     GNUNET_break (0);
1520     GNUNET_SERVICE_client_drop (client);
1521     return;
1522   }
1523
1524   GNUNET_SERVICE_client_continue (client);
1525
1526   if (0 != set->content->iterator_count)
1527   {
1528     struct PendingMutation *pm;
1529
1530     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1531                 "Scheduling mutation on set\n");
1532
1533     pm = GNUNET_new (struct PendingMutation);
1534     pm->mutation_message = GNUNET_copy_message (m);
1535     pm->set = set;
1536     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1537                                       set->content->pending_mutations_tail,
1538                                       pm);
1539     return;
1540   }
1541   execute_mutation (set, m);
1542 }
1543
1544
1545 /**
1546  * Advance the current generation of a set,
1547  * adding exclusion ranges if necessary.
1548  *
1549  * @param set the set where we want to advance the generation
1550  */
1551 static void
1552 advance_generation (struct Set *set)
1553 {
1554   struct GenerationRange r;
1555
1556   if (set->current_generation == set->content->latest_generation)
1557   {
1558     set->content->latest_generation += 1;
1559     set->current_generation += 1;
1560     return;
1561   }
1562
1563   GNUNET_assert (set->current_generation < set->content->latest_generation);
1564
1565   r.start = set->current_generation + 1;
1566   r.end = set->content->latest_generation + 1;
1567
1568   set->content->latest_generation = r.end;
1569   set->current_generation = r.end;
1570
1571   GNUNET_array_append (set->excluded_generations,
1572                        set->excluded_generations_size,
1573                        r);
1574 }
1575
1576
1577 /**
1578  * Called when a client wants to initiate a set operation with another
1579  * peer.  Initiates the CADET connection to the listener and sends the
1580  * request.
1581  *
1582  * @param cls client that sent the message
1583  * @param msg message sent by the client
1584  * @return #GNUNET_OK if the message is well-formed
1585  */
1586 static int
1587 check_client_evaluate (void *cls,
1588                         const struct GNUNET_SET_EvaluateMessage *msg)
1589 {
1590   /* FIXME: suboptimal, even if the context below could be NULL,
1591      there are malformed messages this does not check for... */
1592   return GNUNET_OK;
1593 }
1594
1595
1596 /**
1597  * Called when a client wants to initiate a set operation with another
1598  * peer.  Initiates the CADET connection to the listener and sends the
1599  * request.
1600  *
1601  * @param cls client that sent the message
1602  * @param msg message sent by the client
1603  */
1604 static void
1605 handle_client_evaluate (void *cls,
1606                         const struct GNUNET_SET_EvaluateMessage *msg)
1607 {
1608   struct GNUNET_SERVICE_Client *client = cls;
1609   struct Operation *op = GNUNET_new (struct Operation);
1610   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1611     GNUNET_MQ_hd_var_size (p2p_message,
1612                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1613                            struct GNUNET_MessageHeader,
1614                            op),
1615     GNUNET_MQ_hd_var_size (p2p_message,
1616                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1617                            struct GNUNET_MessageHeader,
1618                            op),
1619     GNUNET_MQ_hd_var_size (p2p_message,
1620                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1621                            struct GNUNET_MessageHeader,
1622                            op),
1623     GNUNET_MQ_hd_var_size (p2p_message,
1624                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1625                            struct GNUNET_MessageHeader,
1626                            op),
1627     GNUNET_MQ_hd_var_size (p2p_message,
1628                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1629                            struct GNUNET_MessageHeader,
1630                            op),
1631     GNUNET_MQ_hd_var_size (p2p_message,
1632                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1633                            struct GNUNET_MessageHeader,
1634                            op),
1635     GNUNET_MQ_hd_var_size (p2p_message,
1636                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1637                            struct GNUNET_MessageHeader,
1638                            op),
1639     GNUNET_MQ_hd_var_size (p2p_message,
1640                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1641                            struct GNUNET_MessageHeader,
1642                            op),
1643     GNUNET_MQ_hd_var_size (p2p_message,
1644                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1645                            struct GNUNET_MessageHeader,
1646                            op),
1647     GNUNET_MQ_hd_var_size (p2p_message,
1648                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1649                            struct GNUNET_MessageHeader,
1650                            op),
1651     GNUNET_MQ_hd_var_size (p2p_message,
1652                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1653                            struct GNUNET_MessageHeader,
1654                            op),
1655     GNUNET_MQ_hd_var_size (p2p_message,
1656                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1657                            struct GNUNET_MessageHeader,
1658                            op),
1659     GNUNET_MQ_hd_var_size (p2p_message,
1660                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1661                            struct GNUNET_MessageHeader,
1662                            op),
1663     GNUNET_MQ_hd_var_size (p2p_message,
1664                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1665                            struct GNUNET_MessageHeader,
1666                            op),
1667     GNUNET_MQ_hd_var_size (p2p_message,
1668                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1669                            struct GNUNET_MessageHeader,
1670                            op),
1671     GNUNET_MQ_handler_end ()
1672   };
1673   struct Set *set;
1674   struct OperationSpecification *spec;
1675   const struct GNUNET_MessageHeader *context;
1676
1677   set = set_get (client);
1678   if (NULL == set)
1679   {
1680     GNUNET_break (0);
1681     GNUNET_free (op);
1682     GNUNET_SERVICE_client_drop (client);
1683     return;
1684   }
1685   spec = GNUNET_new (struct OperationSpecification);
1686   spec->operation = set->operation;
1687   spec->app_id = msg->app_id;
1688   spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1689                                          UINT32_MAX);
1690   spec->peer = msg->target_peer;
1691   spec->set = set;
1692   spec->result_mode = ntohl (msg->result_mode);
1693   spec->client_request_id = ntohl (msg->request_id);
1694   spec->byzantine = msg->byzantine;
1695   spec->byzantine_lower_bound = msg->byzantine_lower_bound;
1696   spec->force_full = msg->force_full;
1697   spec->force_delta = msg->force_delta;
1698   context = GNUNET_MQ_extract_nested_mh (msg);
1699   op->spec = spec;
1700
1701   // Advance generation values, so that
1702   // mutations won't interfer with the running operation.
1703   op->generation_created = set->current_generation;
1704   advance_generation (set);
1705
1706   op->vt = set->vt;
1707   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1708                                set->ops_tail,
1709                                op);
1710   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1711               "Creating new CADET channel to port %s\n",
1712               GNUNET_h2s (&msg->app_id));
1713   op->channel = GNUNET_CADET_channel_creatE (cadet,
1714                                              op,
1715                                              &msg->target_peer,
1716                                              &msg->app_id,
1717                                              GNUNET_CADET_OPTION_RELIABLE,
1718                                              &channel_window_cb,
1719                                              &channel_end_cb,
1720                                              cadet_handlers);
1721   op->mq = GNUNET_CADET_get_mq (op->channel);
1722   set->vt->evaluate (op,
1723                      context);
1724   GNUNET_SERVICE_client_continue (client);
1725 }
1726
1727
1728 /**
1729  * Handle an ack from a client, and send the next element. Note
1730  * that we only expect acks for set elements, not after the
1731  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1732  *
1733  * @param cls client the client
1734  * @param ack the message
1735  */
1736 static void
1737 handle_client_iter_ack (void *cls,
1738                         const struct GNUNET_SET_IterAckMessage *ack)
1739 {
1740   struct GNUNET_SERVICE_Client *client = cls;
1741   struct Set *set;
1742
1743   set = set_get (client);
1744   if (NULL == set)
1745   {
1746     /* client without a set acknowledged receiving a value */
1747     GNUNET_break (0);
1748     GNUNET_SERVICE_client_drop (client);
1749     return;
1750   }
1751   if (NULL == set->iter)
1752   {
1753     /* client sent an ack, but we were not expecting one (as
1754        set iteration has finished) */
1755     GNUNET_break (0);
1756     GNUNET_SERVICE_client_drop (client);
1757     return;
1758   }
1759   GNUNET_SERVICE_client_continue (client);
1760   if (ntohl (ack->send_more))
1761   {
1762     send_client_element (set);
1763   }
1764   else
1765   {
1766     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1767     set->iter = NULL;
1768     set->iteration_id++;
1769   }
1770 }
1771
1772
1773 /**
1774  * Handle a request from the client to copy a set.
1775  *
1776  * @param cls the client
1777  * @param mh the message
1778  */
1779 static void
1780 handle_client_copy_lazy_prepare (void *cls,
1781                                  const struct GNUNET_MessageHeader *mh)
1782 {
1783   struct GNUNET_SERVICE_Client *client = cls;
1784   struct Set *set;
1785   struct LazyCopyRequest *cr;
1786   struct GNUNET_MQ_Envelope *ev;
1787   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1788
1789   set = set_get (client);
1790   if (NULL == set)
1791   {
1792     /* client without a set requested an operation */
1793     GNUNET_break (0);
1794     GNUNET_SERVICE_client_drop (client);
1795     return;
1796   }
1797
1798   cr = GNUNET_new (struct LazyCopyRequest);
1799
1800   cr->cookie = lazy_copy_cookie;
1801   lazy_copy_cookie += 1;
1802   cr->source_set = set;
1803
1804   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1805                                lazy_copy_tail,
1806                                cr);
1807
1808
1809   ev = GNUNET_MQ_msg (resp_msg,
1810                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1811   resp_msg->cookie = cr->cookie;
1812   GNUNET_MQ_send (set->client_mq, ev);
1813
1814
1815   GNUNET_SERVICE_client_continue (client);
1816
1817   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1818               "Client requested lazy copy\n");
1819 }
1820
1821
1822 /**
1823  * Handle a request from the client to connect to a copy of a set.
1824  *
1825  * @param cls the client
1826  * @param msg the message
1827  */
1828 static void
1829 handle_client_copy_lazy_connect (void *cls,
1830                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1831 {
1832   struct GNUNET_SERVICE_Client *client = cls;
1833   struct LazyCopyRequest *cr;
1834   struct Set *set;
1835   int found;
1836
1837   if (NULL != set_get (client))
1838   {
1839     /* There can only be one set per client */
1840     GNUNET_break (0);
1841     GNUNET_SERVICE_client_drop (client);
1842     return;
1843   }
1844
1845   found = GNUNET_NO;
1846
1847   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1848   {
1849     if (cr->cookie == msg->cookie)
1850     {
1851       found = GNUNET_YES;
1852       break;
1853     }
1854   }
1855
1856   if (GNUNET_NO == found)
1857   {
1858     /* client asked for copy with cookie we don't know */
1859     GNUNET_break (0);
1860     GNUNET_SERVICE_client_drop (client);
1861     return;
1862   }
1863
1864   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1865                                lazy_copy_tail,
1866                                cr);
1867
1868   set = GNUNET_new (struct Set);
1869
1870   switch (cr->source_set->operation)
1871   {
1872   case GNUNET_SET_OPERATION_INTERSECTION:
1873     set->vt = _GSS_intersection_vt ();
1874     break;
1875   case GNUNET_SET_OPERATION_UNION:
1876     set->vt = _GSS_union_vt ();
1877     break;
1878   default:
1879     GNUNET_assert (0);
1880     return;
1881   }
1882
1883   if (NULL == set->vt->copy_state)
1884   {
1885     /* Lazy copy not supported for this set operation */
1886     GNUNET_break (0);
1887     GNUNET_free (set);
1888     GNUNET_free (cr);
1889     GNUNET_SERVICE_client_drop (client);
1890     return;
1891   }
1892
1893   set->operation = cr->source_set->operation;
1894   set->state = set->vt->copy_state (cr->source_set);
1895   set->content = cr->source_set->content;
1896   set->content->refcount += 1;
1897
1898   set->current_generation = cr->source_set->current_generation;
1899   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1900   set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations,
1901                                              set->excluded_generations_size * sizeof (struct GenerationRange));
1902
1903   /* Advance the generation of the new set, so that mutations to the
1904      of the cloned set and the source set are independent. */
1905   advance_generation (set);
1906
1907
1908   set->client = client;
1909   set->client_mq = GNUNET_SERVICE_client_get_mq (client);
1910   GNUNET_CONTAINER_DLL_insert (sets_head,
1911                                sets_tail,
1912                                set);
1913
1914   GNUNET_free (cr);
1915
1916   GNUNET_SERVICE_client_continue (client);
1917
1918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919               "Client connected to lazy set\n");
1920 }
1921
1922
1923 /**
1924  * Handle a request from the client to cancel a running set operation.
1925  *
1926  * @param cls the client
1927  * @param msg the message
1928  */
1929 static void
1930 handle_client_cancel (void *cls,
1931                       const struct GNUNET_SET_CancelMessage *msg)
1932 {
1933   struct GNUNET_SERVICE_Client *client = cls;
1934   struct Set *set;
1935   struct Operation *op;
1936   int found;
1937
1938   set = set_get (client);
1939   if (NULL == set)
1940   {
1941     /* client without a set requested an operation */
1942     GNUNET_break (0);
1943     GNUNET_SERVICE_client_drop (client);
1944     return;
1945   }
1946   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1947               "Client requested cancel for op %u\n",
1948               (uint32_t) ntohl (msg->request_id));
1949   found = GNUNET_NO;
1950   for (op = set->ops_head; NULL != op; op = op->next)
1951   {
1952     if (op->spec->client_request_id == ntohl (msg->request_id))
1953     {
1954       found = GNUNET_YES;
1955       break;
1956     }
1957   }
1958   if (GNUNET_NO == found)
1959   {
1960     /* It may happen that the operation was already destroyed due to
1961      * the other peer disconnecting.  The client may not know about this
1962      * yet and try to cancel the (just barely non-existent) operation.
1963      * So this is not a hard error.
1964      */
1965     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1966                 "Client canceled non-existent op\n");
1967   }
1968   else
1969   {
1970     _GSS_operation_destroy (op,
1971                             GNUNET_YES);
1972   }
1973   GNUNET_SERVICE_client_continue (client);
1974 }
1975
1976
1977 /**
1978  * Handle a request from the client to accept a set operation that
1979  * came from a remote peer.  We forward the accept to the associated
1980  * operation for handling
1981  *
1982  * @param cls the client
1983  * @param msg the message
1984  */
1985 static void
1986 handle_client_accept (void *cls,
1987                       const struct GNUNET_SET_AcceptMessage *msg)
1988 {
1989   struct GNUNET_SERVICE_Client *client = cls;
1990   struct Set *set;
1991   struct Operation *op;
1992   struct GNUNET_SET_ResultMessage *result_message;
1993   struct GNUNET_MQ_Envelope *ev;
1994
1995   set = set_get (client);
1996   if (NULL == set)
1997   {
1998     /* client without a set requested to accept */
1999     GNUNET_break (0);
2000     GNUNET_SERVICE_client_drop (client);
2001     return;
2002   }
2003   op = get_incoming (ntohl (msg->accept_reject_id));
2004   if (NULL == op)
2005   {
2006     /* It is not an error if the set op does not exist -- it may
2007      * have been destroyed when the partner peer disconnected. */
2008     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2009                 "Client accepted request that is no longer active\n");
2010     ev = GNUNET_MQ_msg (result_message,
2011                         GNUNET_MESSAGE_TYPE_SET_RESULT);
2012     result_message->request_id = msg->request_id;
2013     result_message->element_type = 0;
2014     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2015     GNUNET_MQ_send (set->client_mq, ev);
2016     GNUNET_SERVICE_client_continue (client);
2017     return;
2018   }
2019
2020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2021               "Client accepting request %u\n",
2022               (uint32_t) ntohl (msg->accept_reject_id));
2023   GNUNET_assert (GNUNET_YES == op->is_incoming);
2024   op->is_incoming = GNUNET_NO;
2025   GNUNET_CONTAINER_DLL_remove (incoming_head,
2026                                incoming_tail,
2027                                op);
2028   op->spec->set = set;
2029   GNUNET_CONTAINER_DLL_insert (set->ops_head,
2030                                set->ops_tail,
2031                                op);
2032   op->spec->client_request_id = ntohl (msg->request_id);
2033   op->spec->result_mode = ntohl (msg->result_mode);
2034   op->spec->byzantine = msg->byzantine;
2035   op->spec->byzantine_lower_bound = msg->byzantine_lower_bound;
2036   op->spec->force_full = msg->force_full;
2037   op->spec->force_delta = msg->force_delta;
2038
2039   // Advance generation values, so that
2040   // mutations won't interfer with the running operation.
2041   op->generation_created = set->current_generation;
2042   advance_generation (set);
2043
2044   op->vt = set->vt;
2045   op->vt->accept (op);
2046   GNUNET_SERVICE_client_continue (client);
2047 }
2048
2049
2050 /**
2051  * Called to clean up, after a shutdown has been requested.
2052  *
2053  * @param cls closure
2054  */
2055 static void
2056 shutdown_task (void *cls)
2057 {
2058   while (NULL != incoming_head)
2059     incoming_destroy (incoming_head);
2060   while (NULL != listeners_head)
2061     listener_destroy (listeners_head);
2062   while (NULL != sets_head)
2063     set_destroy (sets_head);
2064
2065   /* it's important to destroy cadet at the end, as all channels
2066    * must be destroyed before the cadet handle! */
2067   if (NULL != cadet)
2068   {
2069     GNUNET_CADET_disconnect (cadet);
2070     cadet = NULL;
2071   }
2072   GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
2073   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2074               "handled shutdown request\n");
2075 }
2076
2077
2078 /**
2079  * Function called by the service's run
2080  * method to run service-specific setup code.
2081  *
2082  * @param cls closure
2083  * @param cfg configuration to use
2084  * @param service the initialized service
2085  */
2086 static void
2087 run (void *cls,
2088      const struct GNUNET_CONFIGURATION_Handle *cfg,
2089      struct GNUNET_SERVICE_Handle *service)
2090 {
2091   configuration = cfg;
2092   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2093                                  NULL);
2094   _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
2095   cadet = GNUNET_CADET_connecT (cfg);
2096   if (NULL == cadet)
2097   {
2098     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2099                 _("Could not connect to CADET service\n"));
2100     return;
2101   }
2102 }
2103
2104
2105 /**
2106  * Define "main" method using service macro.
2107  */
2108 GNUNET_SERVICE_MAIN
2109 ("set",
2110  GNUNET_SERVICE_OPTION_NONE,
2111  &run,
2112  &client_connect_cb,
2113  &client_disconnect_cb,
2114  NULL,
2115  GNUNET_MQ_hd_fixed_size (client_accept,
2116                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2117                           struct GNUNET_SET_AcceptMessage,
2118                           NULL),
2119  GNUNET_MQ_hd_fixed_size (client_iter_ack,
2120                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2121                           struct GNUNET_SET_IterAckMessage,
2122                           NULL),
2123  GNUNET_MQ_hd_var_size (client_mutation,
2124                         GNUNET_MESSAGE_TYPE_SET_ADD,
2125                         struct GNUNET_MessageHeader,
2126                         NULL),
2127  GNUNET_MQ_hd_fixed_size (client_create_set,
2128                           GNUNET_MESSAGE_TYPE_SET_CREATE,
2129                           struct GNUNET_SET_CreateMessage,
2130                           NULL),
2131  GNUNET_MQ_hd_fixed_size (client_iterate,
2132                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2133                           struct GNUNET_MessageHeader,
2134                           NULL),
2135  GNUNET_MQ_hd_var_size (client_evaluate,
2136                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2137                         struct GNUNET_SET_EvaluateMessage,
2138                         NULL),
2139  GNUNET_MQ_hd_fixed_size (client_listen,
2140                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
2141                           struct GNUNET_SET_ListenMessage,
2142                           NULL),
2143  GNUNET_MQ_hd_fixed_size (client_reject,
2144                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2145                           struct GNUNET_SET_RejectMessage,
2146                           NULL),
2147  GNUNET_MQ_hd_var_size (client_mutation,
2148                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2149                         struct GNUNET_MessageHeader,
2150                         NULL),
2151  GNUNET_MQ_hd_fixed_size (client_cancel,
2152                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2153                           struct GNUNET_SET_CancelMessage,
2154                           NULL),
2155  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2156                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2157                           struct GNUNET_MessageHeader,
2158                           NULL),
2159  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2160                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2161                           struct GNUNET_SET_CopyLazyConnectMessage,
2162                           NULL),
2163  GNUNET_MQ_handler_end ());
2164
2165
2166 /* end of gnunet-service-set.c */