42d06b275329d0c67b1eac5405af3fc5609806e6
[oweals/gnunet.git] / src / set / gnunet-service-set.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
31
32 /**
33  * How long do we hold on to an incoming channel if there is
34  * no local listener before giving up?
35  */
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38
39 /**
40  * Lazy copy requests made by a client.
41  */
42 struct LazyCopyRequest
43 {
44   /**
45    * Kept in a DLL.
46    */
47   struct LazyCopyRequest *prev;
48
49   /**
50    * Kept in a DLL.
51    */
52   struct LazyCopyRequest *next;
53
54   /**
55    * Which set are we supposed to copy?
56    */
57   struct Set *source_set;
58
59   /**
60    * Cookie identifying the request.
61    */
62   uint32_t cookie;
63
64 };
65
66
67 /**
68  * A listener is inhabited by a client, and waits for evaluation
69  * requests from remote peers.
70  */
71 struct Listener
72 {
73   /**
74    * Listeners are held in a doubly linked list.
75    */
76   struct Listener *next;
77
78   /**
79    * Listeners are held in a doubly linked list.
80    */
81   struct Listener *prev;
82
83   /**
84    * Head of DLL of operations this listener is responsible for.
85    * Once the client has accepted/declined the operation, the
86    * operation is moved to the respective set's operation DLLS.
87    */
88   struct Operation *op_head;
89
90   /**
91    * Tail of DLL of operations this listener is responsible for.
92    * Once the client has accepted/declined the operation, the
93    * operation is moved to the respective set's operation DLLS.
94    */
95   struct Operation *op_tail;
96
97   /**
98    * Client that owns the listener.
99    * Only one client may own a listener.
100    */
101   struct ClientState *cs;
102
103   /**
104    * The port we are listening on with CADET.
105    */
106   struct GNUNET_CADET_Port *open_port;
107
108   /**
109    * Application ID for the operation, used to distinguish
110    * multiple operations of the same type with the same peer.
111    */
112   struct GNUNET_HashCode app_id;
113
114   /**
115    * The type of the operation.
116    */
117   enum GNUNET_SET_OperationType operation;
118 };
119
120
121 /**
122  * Handle to the cadet service, used to listen for and connect to
123  * remote peers.
124  */
125 static struct GNUNET_CADET_Handle *cadet;
126
127 /**
128  * DLL of lazy copy requests by this client.
129  */
130 static struct LazyCopyRequest *lazy_copy_head;
131
132 /**
133  * DLL of lazy copy requests by this client.
134  */
135 static struct LazyCopyRequest *lazy_copy_tail;
136
137 /**
138  * Generator for unique cookie we set per lazy copy request.
139  */
140 static uint32_t lazy_copy_cookie;
141
142 /**
143  * Statistics handle.
144  */
145 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
146
147 /**
148  * Listeners are held in a doubly linked list.
149  */
150 static struct Listener *listener_head;
151
152 /**
153  * Listeners are held in a doubly linked list.
154  */
155 static struct Listener *listener_tail;
156
157 /**
158  * Number of active clients.
159  */
160 static unsigned int num_clients;
161
162 /**
163  * Are we in shutdown? if #GNUNET_YES and the number of clients
164  * drops to zero, disconnect from CADET.
165  */
166 static int in_shutdown;
167
168 /**
169  * Counter for allocating unique IDs for clients, used to identify
170  * incoming operation requests from remote peers, that the client can
171  * choose to accept or refuse.  0 must not be used (reserved for
172  * uninitialized).
173  */
174 static uint32_t suggest_id;
175
176
177 /**
178  * Get the incoming socket associated with the given id.
179  *
180  * @param listener the listener to look in
181  * @param id id to look for
182  * @return the incoming socket associated with the id,
183  *         or NULL if there is none
184  */
185 static struct Operation *
186 get_incoming (uint32_t id)
187 {
188   for (struct Listener *listener = listener_head;
189        NULL != listener;
190        listener = listener->next)
191   {
192     for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
193       if (op->suggest_id == id)
194         return op;
195   }
196   return NULL;
197 }
198
199
200 /**
201  * Destroy an incoming request from a remote peer
202  *
203  * @param op remote request to destroy
204  */
205 static void
206 incoming_destroy (struct Operation *op)
207 {
208   struct Listener *listener;
209   struct GNUNET_CADET_Channel *channel;
210
211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
212               "Destroying incoming operation %p\n",
213               op);
214   if (NULL != (listener = op->listener))
215   {
216     GNUNET_CONTAINER_DLL_remove (listener->op_head,
217                                  listener->op_tail,
218                                  op);
219     op->listener = NULL;
220   }
221   if (NULL != op->timeout_task)
222   {
223     GNUNET_SCHEDULER_cancel (op->timeout_task);
224     op->timeout_task = NULL;
225   }
226   if (NULL != (channel = op->channel))
227   {
228     op->channel = NULL;
229     GNUNET_CADET_channel_destroy (channel);
230   }
231 }
232
233
234 /**
235  * Context for the #garbage_collect_cb().
236  */
237 struct GarbageContext
238 {
239
240   /**
241    * Map for which we are garbage collecting removed elements.
242    */
243   struct GNUNET_CONTAINER_MultiHashMap *map;
244
245   /**
246    * Lowest generation for which an operation is still pending.
247    */
248   unsigned int min_op_generation;
249
250   /**
251    * Largest generation for which an operation is still pending.
252    */
253   unsigned int max_op_generation;
254
255 };
256
257
258 /**
259  * Function invoked to check if an element can be removed from
260  * the set's history because it is no longer needed.
261  *
262  * @param cls the `struct GarbageContext *`
263  * @param key key of the element in the map
264  * @param value the `struct ElementEntry *`
265  * @return #GNUNET_OK (continue to iterate)
266  */
267 static int
268 garbage_collect_cb (void *cls,
269                     const struct GNUNET_HashCode *key,
270                     void *value)
271 {
272   //struct GarbageContext *gc = cls;
273   //struct ElementEntry *ee = value;
274
275   //if (GNUNET_YES != ee->removed)
276   //  return GNUNET_OK;
277   //if ( (gc->max_op_generation < ee->generation_added) ||
278   //     (ee->generation_removed > gc->min_op_generation) )
279   //{
280   //  GNUNET_assert (GNUNET_YES ==
281   //                 GNUNET_CONTAINER_multihashmap_remove (gc->map,
282   //                                                       key,
283   //                                                       ee));
284   //  GNUNET_free (ee);
285   //}
286   return GNUNET_OK;
287 }
288
289
290 /**
291  * Collect and destroy elements that are not needed anymore, because
292  * their lifetime (as determined by their generation) does not overlap
293  * with any active set operation.
294  *
295  * @param set set to garbage collect
296  */
297 static void
298 collect_generation_garbage (struct Set *set)
299 {
300   struct GarbageContext gc;
301
302   gc.min_op_generation = UINT_MAX;
303   gc.max_op_generation = 0;
304   for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
305   {
306     gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
307                                        op->generation_created);
308     gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
309                                        op->generation_created);
310   }
311   gc.map = set->content->elements;
312   GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
313                                          &garbage_collect_cb,
314                                          &gc);
315 }
316
317
318 /**
319  * Is @a generation in the range of exclusions?
320  *
321  * @param generation generation to query
322  * @param excluded array of generations where the element is excluded
323  * @param excluded_size length of the @a excluded array
324  * @return #GNUNET_YES if @a generation is in any of the ranges
325  */
326 static int
327 is_excluded_generation (unsigned int generation,
328                         struct GenerationRange *excluded,
329                         unsigned int excluded_size)
330 {
331   for (unsigned int i = 0; i < excluded_size; i++)
332     if ( (generation >= excluded[i].start) &&
333          (generation < excluded[i].end) )
334       return GNUNET_YES;
335   return GNUNET_NO;
336 }
337
338
339 /**
340  * Is element @a ee part of the set during @a query_generation?
341  *
342  * @param ee element to test
343  * @param query_generation generation to query
344  * @param excluded array of generations where the element is excluded
345  * @param excluded_size length of the @a excluded array
346  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
347  */
348 static int
349 is_element_of_generation (struct ElementEntry *ee,
350                           unsigned int query_generation,
351                           struct GenerationRange *excluded,
352                           unsigned int excluded_size)
353 {
354   struct MutationEvent *mut;
355   int is_present;
356
357   GNUNET_assert (NULL != ee->mutations);
358   if (GNUNET_YES ==
359       is_excluded_generation (query_generation,
360                               excluded,
361                               excluded_size))
362   {
363     GNUNET_break (0);
364     return GNUNET_NO;
365   }
366
367   is_present = GNUNET_NO;
368
369   /* Could be made faster with binary search, but lists
370      are small, so why bother. */
371   for (unsigned int i = 0; i < ee->mutations_size; i++)
372   {
373     mut = &ee->mutations[i];
374
375     if (mut->generation > query_generation)
376     {
377       /* The mutation doesn't apply to our generation
378          anymore.  We can'b break here, since mutations aren't
379          sorted by generation. */
380       continue;
381     }
382
383     if (GNUNET_YES ==
384         is_excluded_generation (mut->generation,
385                                 excluded,
386                                 excluded_size))
387     {
388       /* The generation is excluded (because it belongs to another
389          fork via a lazy copy) and thus mutations aren't considered
390          for membership testing. */
391       continue;
392     }
393
394     /* This would be an inconsistency in how we manage mutations. */
395     if ( (GNUNET_YES == is_present) &&
396          (GNUNET_YES == mut->added) )
397       GNUNET_assert (0);
398     /* Likewise. */
399     if ( (GNUNET_NO == is_present) &&
400          (GNUNET_NO == mut->added) )
401       GNUNET_assert (0);
402
403     is_present = mut->added;
404   }
405
406   return is_present;
407 }
408
409
410 /**
411  * Is element @a ee part of the set used by @a op?
412  *
413  * @param ee element to test
414  * @param op operation the defines the set and its generation
415  * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
416  */
417 int
418 _GSS_is_element_of_operation (struct ElementEntry *ee,
419                               struct Operation *op)
420 {
421   return is_element_of_generation (ee,
422                                    op->generation_created,
423                                    op->set->excluded_generations,
424                                    op->set->excluded_generations_size);
425 }
426
427
428 /**
429  * Destroy the given operation.  Used for any operation where both
430  * peers were known and that thus actually had a vt and channel.  Must
431  * not be used for operations where 'listener' is still set and we do
432  * not know the other peer.
433  *
434  * Call the implementation-specific cancel function of the operation.
435  * Disconnects from the remote peer.  Does not disconnect the client,
436  * as there may be multiple operations per set.
437  *
438  * @param op operation to destroy
439  * @param gc #GNUNET_YES to perform garbage collection on the set
440  */
441 void
442 _GSS_operation_destroy (struct Operation *op,
443                         int gc)
444 {
445   struct Set *set = op->set;
446   struct GNUNET_CADET_Channel *channel;
447
448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449               "Destroying operation %p\n",
450               op);
451   GNUNET_assert (NULL == op->listener);
452   if (NULL != op->state)
453   {
454     set->vt->cancel (op);
455     op->state = NULL;
456   }
457   if (NULL != set)
458   {
459     GNUNET_CONTAINER_DLL_remove (set->ops_head,
460                                  set->ops_tail,
461                                  op);
462     op->set = NULL;
463   }
464   if (NULL != op->context_msg)
465   {
466     GNUNET_free (op->context_msg);
467     op->context_msg = NULL;
468   }
469   if (NULL != (channel = op->channel))
470   {
471     /* This will free op; called conditionally as this helper function
472        is also called from within the channel disconnect handler. */
473     op->channel = NULL;
474     GNUNET_CADET_channel_destroy (channel);
475   }
476   if ( (NULL != set) &&
477        (GNUNET_YES == gc) )
478     collect_generation_garbage (set);
479   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
480    * there was a channel end handler that will free 'op' on the call stack. */
481 }
482
483
484 /**
485  * Callback called when a client connects to the service.
486  *
487  * @param cls closure for the service
488  * @param c the new client that connected to the service
489  * @param mq the message queue used to send messages to the client
490  * @return @a `struct ClientState`
491  */
492 static void *
493 client_connect_cb (void *cls,
494                    struct GNUNET_SERVICE_Client *c,
495                    struct GNUNET_MQ_Handle *mq)
496 {
497   struct ClientState *cs;
498
499   num_clients++;
500   cs = GNUNET_new (struct ClientState);
501   cs->client = c;
502   cs->mq = mq;
503   return cs;
504 }
505
506
507 /**
508  * Iterator over hash map entries to free element entries.
509  *
510  * @param cls closure
511  * @param key current key code
512  * @param value a `struct ElementEntry *` to be free'd
513  * @return #GNUNET_YES (continue to iterate)
514  */
515 static int
516 destroy_elements_iterator (void *cls,
517                            const struct GNUNET_HashCode *key,
518                            void *value)
519 {
520   struct ElementEntry *ee = value;
521
522   GNUNET_free_non_null (ee->mutations);
523   GNUNET_free (ee);
524   return GNUNET_YES;
525 }
526
527
528 /**
529  * Clean up after a client has disconnected
530  *
531  * @param cls closure, unused
532  * @param client the client to clean up after
533  * @param internal_cls the `struct ClientState`
534  */
535 static void
536 client_disconnect_cb (void *cls,
537                       struct GNUNET_SERVICE_Client *client,
538                       void *internal_cls)
539 {
540   struct ClientState *cs = internal_cls;
541   struct Operation *op;
542   struct Listener *listener;
543   struct Set *set;
544
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546               "Client disconnected, cleaning up\n");
547   if (NULL != (set = cs->set))
548   {
549     struct SetContent *content = set->content;
550     struct PendingMutation *pm;
551     struct PendingMutation *pm_current;
552     struct LazyCopyRequest *lcr;
553
554     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555                 "Destroying client's set\n");
556     /* Destroy pending set operations */
557     while (NULL != set->ops_head)
558       _GSS_operation_destroy (set->ops_head,
559                               GNUNET_NO);
560
561     /* Destroy operation-specific state */
562     GNUNET_assert (NULL != set->state);
563     set->vt->destroy_set (set->state);
564     set->state = NULL;
565
566     /* Clean up ongoing iterations */
567     if (NULL != set->iter)
568     {
569       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
570       set->iter = NULL;
571       set->iteration_id++;
572     }
573
574     /* discard any pending mutations that reference this set */
575     pm = content->pending_mutations_head;
576     while (NULL != pm)
577     {
578       pm_current = pm;
579       pm = pm->next;
580       if (pm_current->set == set)
581       {
582         GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
583                                      content->pending_mutations_tail,
584                                      pm_current);
585         GNUNET_free (pm_current);
586       }
587     }
588
589     /* free set content (or at least decrement RC) */
590     set->content = NULL;
591     GNUNET_assert (0 != content->refcount);
592     content->refcount--;
593     if (0 == content->refcount)
594     {
595       GNUNET_assert (NULL != content->elements);
596       GNUNET_CONTAINER_multihashmap_iterate (content->elements,
597                                              &destroy_elements_iterator,
598                                              NULL);
599       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
600       content->elements = NULL;
601       GNUNET_free (content);
602     }
603     GNUNET_free_non_null (set->excluded_generations);
604     set->excluded_generations = NULL;
605
606     /* remove set from pending copy requests */
607     lcr = lazy_copy_head;
608     while (NULL != lcr)
609     {
610       struct LazyCopyRequest *lcr_current = lcr;
611
612       lcr = lcr->next;
613       if (lcr_current->source_set == set)
614       {
615         GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
616                                      lazy_copy_tail,
617                                      lcr_current);
618         GNUNET_free (lcr_current);
619       }
620     }
621     GNUNET_free (set);
622   }
623
624   if (NULL != (listener = cs->listener))
625   {
626     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627                 "Destroying client's listener\n");
628     GNUNET_CADET_close_port (listener->open_port);
629     listener->open_port = NULL;
630     while (NULL != (op = listener->op_head))
631     {
632       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
633                   "Destroying incoming operation `%u' from peer `%s'\n",
634                   (unsigned int) op->client_request_id,
635                   GNUNET_i2s (&op->peer));
636       incoming_destroy (op);
637     }
638     GNUNET_CONTAINER_DLL_remove (listener_head,
639                                  listener_tail,
640                                  listener);
641     GNUNET_free (listener);
642   }
643   GNUNET_free (cs);
644   num_clients--;
645   if ( (GNUNET_YES == in_shutdown) &&
646        (0 == num_clients) )
647   {
648     if (NULL != cadet)
649     {
650       GNUNET_CADET_disconnect (cadet);
651       cadet = NULL;
652     }
653   }
654 }
655
656
657 /**
658  * Check a request for a set operation from another peer.
659  *
660  * @param cls the operation state
661  * @param msg the received message
662  * @return #GNUNET_OK if the channel should be kept alive,
663  *         #GNUNET_SYSERR to destroy the channel
664  */
665 static int
666 check_incoming_msg (void *cls,
667                     const struct OperationRequestMessage *msg)
668 {
669   struct Operation *op = cls;
670   struct Listener *listener = op->listener;
671   const struct GNUNET_MessageHeader *nested_context;
672
673   /* double operation request */
674   if (0 != op->suggest_id)
675   {
676     GNUNET_break_op (0);
677     return GNUNET_SYSERR;
678   }
679   /* This should be equivalent to the previous condition, but can't hurt to check twice */
680   if (NULL == op->listener)
681   {
682     GNUNET_break (0);
683     return GNUNET_SYSERR;
684   }
685   if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
686   {
687     GNUNET_break_op (0);
688     return GNUNET_SYSERR;
689   }
690   nested_context = GNUNET_MQ_extract_nested_mh (msg);
691   if ( (NULL != nested_context) &&
692        (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
693   {
694     GNUNET_break_op (0);
695     return GNUNET_SYSERR;
696   }
697   return GNUNET_OK;
698 }
699
700
701 /**
702  * Handle a request for a set operation from another peer.  Checks if we
703  * have a listener waiting for such a request (and in that case initiates
704  * asking the listener about accepting the connection). If no listener
705  * is waiting, we queue the operation request in hope that a listener
706  * shows up soon (before timeout).
707  *
708  * This msg is expected as the first and only msg handled through the
709  * non-operation bound virtual table, acceptance of this operation replaces
710  * our virtual table and subsequent msgs would be routed differently (as
711  * we then know what type of operation this is).
712  *
713  * @param cls the operation state
714  * @param msg the received message
715  * @return #GNUNET_OK if the channel should be kept alive,
716  *         #GNUNET_SYSERR to destroy the channel
717  */
718 static void
719 handle_incoming_msg (void *cls,
720                      const struct OperationRequestMessage *msg)
721 {
722   struct Operation *op = cls;
723   struct Listener *listener = op->listener;
724   const struct GNUNET_MessageHeader *nested_context;
725   struct GNUNET_MQ_Envelope *env;
726   struct GNUNET_SET_RequestMessage *cmsg;
727
728   nested_context = GNUNET_MQ_extract_nested_mh (msg);
729   /* Make a copy of the nested_context (application-specific context
730      information that is opaque to set) so we can pass it to the
731      listener later on */
732   if (NULL != nested_context)
733     op->context_msg = GNUNET_copy_message (nested_context);
734   op->remote_element_count = ntohl (msg->element_count);
735   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736               "Received P2P operation request (op %u, port %s) for active listener\n",
737               (uint32_t) ntohl (msg->operation),
738               GNUNET_h2s (&op->listener->app_id));
739   GNUNET_assert (0 == op->suggest_id);
740   if (0 == suggest_id)
741     suggest_id++;
742   op->suggest_id = suggest_id++;
743   GNUNET_assert (NULL != op->timeout_task);
744   GNUNET_SCHEDULER_cancel (op->timeout_task);
745   op->timeout_task = NULL;
746   env = GNUNET_MQ_msg_nested_mh (cmsg,
747                                  GNUNET_MESSAGE_TYPE_SET_REQUEST,
748                                  op->context_msg);
749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750               "Suggesting incoming request with accept id %u to listener %p of client %p\n",
751               op->suggest_id,
752               listener,
753               listener->cs);
754   cmsg->accept_id = htonl (op->suggest_id);
755   cmsg->peer_id = op->peer;
756   GNUNET_MQ_send (listener->cs->mq,
757                   env);
758   /* NOTE: GNUNET_CADET_receive_done() will be called in
759      #handle_client_accept() */
760 }
761
762
763 /**
764  * Add an element to @a set as specified by @a msg
765  *
766  * @param set set to manipulate
767  * @param msg message specifying the change
768  */
769 static void
770 execute_add (struct Set *set,
771              const struct GNUNET_SET_ElementMessage *msg)
772 {
773   struct GNUNET_SET_Element el;
774   struct ElementEntry *ee;
775   struct GNUNET_HashCode hash;
776
777   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
778   el.size = ntohs (msg->header.size) - sizeof (*msg);
779   el.data = &msg[1];
780   el.element_type = ntohs (msg->element_type);
781   GNUNET_SET_element_hash (&el,
782                            &hash);
783   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
784                                           &hash);
785   if (NULL == ee)
786   {
787     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788                 "Client inserts element %s of size %u\n",
789                 GNUNET_h2s (&hash),
790                 el.size);
791     ee = GNUNET_malloc (el.size + sizeof (*ee));
792     ee->element.size = el.size;
793     GNUNET_memcpy (&ee[1],
794             el.data,
795             el.size);
796     ee->element.data = &ee[1];
797     ee->element.element_type = el.element_type;
798     ee->remote = GNUNET_NO;
799     ee->mutations = NULL;
800     ee->mutations_size = 0;
801     ee->element_hash = hash;
802     GNUNET_break (GNUNET_YES ==
803                   GNUNET_CONTAINER_multihashmap_put (set->content->elements,
804                                                      &ee->element_hash,
805                                                      ee,
806                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
807   }
808   else if (GNUNET_YES ==
809            is_element_of_generation (ee,
810                                      set->current_generation,
811                                      set->excluded_generations,
812                                      set->excluded_generations_size))
813   {
814     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815                 "Client inserted element %s of size %u twice (ignored)\n",
816                 GNUNET_h2s (&hash),
817                 el.size);
818
819     /* same element inserted twice */
820     return;
821   }
822
823   {
824     struct MutationEvent mut = {
825       .generation = set->current_generation,
826       .added = GNUNET_YES
827     };
828     GNUNET_array_append (ee->mutations,
829                          ee->mutations_size,
830                          mut);
831   }
832   set->vt->add (set->state,
833                 ee);
834 }
835
836
837 /**
838  * Remove an element from @a set as specified by @a msg
839  *
840  * @param set set to manipulate
841  * @param msg message specifying the change
842  */
843 static void
844 execute_remove (struct Set *set,
845                 const struct GNUNET_SET_ElementMessage *msg)
846 {
847   struct GNUNET_SET_Element el;
848   struct ElementEntry *ee;
849   struct GNUNET_HashCode hash;
850
851   GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
852   el.size = ntohs (msg->header.size) - sizeof (*msg);
853   el.data = &msg[1];
854   el.element_type = ntohs (msg->element_type);
855   GNUNET_SET_element_hash (&el, &hash);
856   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
857                                           &hash);
858   if (NULL == ee)
859   {
860     /* Client tried to remove non-existing element. */
861     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862                 "Client removes non-existing element of size %u\n",
863                 el.size);
864     return;
865   }
866   if (GNUNET_NO ==
867       is_element_of_generation (ee,
868                                 set->current_generation,
869                                 set->excluded_generations,
870                                 set->excluded_generations_size))
871   {
872     /* Client tried to remove element twice */
873     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874                 "Client removed element of size %u twice (ignored)\n",
875                 el.size);
876     return;
877   }
878   else
879   {
880     struct MutationEvent mut = {
881       .generation = set->current_generation,
882       .added = GNUNET_NO
883     };
884
885     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886                 "Client removes element of size %u\n",
887                 el.size);
888
889     GNUNET_array_append (ee->mutations,
890                          ee->mutations_size,
891                          mut);
892   }
893   set->vt->remove (set->state,
894                    ee);
895 }
896
897
898 /**
899  * Perform a mutation on a set as specified by the @a msg
900  *
901  * @param set the set to mutate
902  * @param msg specification of what to change
903  */
904 static void
905 execute_mutation (struct Set *set,
906                   const struct GNUNET_SET_ElementMessage *msg)
907 {
908   switch (ntohs (msg->header.type))
909   {
910     case GNUNET_MESSAGE_TYPE_SET_ADD:
911       execute_add (set, msg);
912       break;
913     case GNUNET_MESSAGE_TYPE_SET_REMOVE:
914       execute_remove (set, msg);
915       break;
916     default:
917       GNUNET_break (0);
918   }
919 }
920
921
922 /**
923  * Execute mutations that were delayed on a set because of
924  * pending operations.
925  *
926  * @param set the set to execute mutations on
927  */
928 static void
929 execute_delayed_mutations (struct Set *set)
930 {
931   struct PendingMutation *pm;
932
933   if (0 != set->content->iterator_count)
934     return; /* still cannot do this */
935   while (NULL != (pm = set->content->pending_mutations_head))
936   {
937     GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
938                                  set->content->pending_mutations_tail,
939                                  pm);
940     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941                 "Executing pending mutation on %p.\n",
942                 pm->set);
943     execute_mutation (pm->set,
944                       pm->msg);
945     GNUNET_free (pm->msg);
946     GNUNET_free (pm);
947   }
948 }
949
950
951 /**
952  * Send the next element of a set to the set's client.  The next element is given by
953  * the set's current hashmap iterator.  The set's iterator will be set to NULL if there
954  * are no more elements in the set.  The caller must ensure that the set's iterator is
955  * valid.
956  *
957  * The client will acknowledge each received element with a
958  * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message.  Our
959  * #handle_client_iter_ack() will then trigger the next transmission.
960  * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
961  *
962  * @param set set that should send its next element to its client
963  */
964 static void
965 send_client_element (struct Set *set)
966 {
967   int ret;
968   struct ElementEntry *ee;
969   struct GNUNET_MQ_Envelope *ev;
970   struct GNUNET_SET_IterResponseMessage *msg;
971
972   GNUNET_assert (NULL != set->iter);
973   do {
974     ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
975                                                        NULL,
976                                                        (const void **) &ee);
977     if (GNUNET_NO == ret)
978     {
979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                   "Iteration on %p done.\n",
981                   set);
982       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
983       GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
984       set->iter = NULL;
985       set->iteration_id++;
986       GNUNET_assert (set->content->iterator_count > 0);
987       set->content->iterator_count--;
988       execute_delayed_mutations (set);
989       GNUNET_MQ_send (set->cs->mq,
990                       ev);
991       return;
992     }
993     GNUNET_assert (NULL != ee);
994   } while (GNUNET_NO ==
995            is_element_of_generation (ee,
996                                      set->iter_generation,
997                                      set->excluded_generations,
998                                      set->excluded_generations_size));
999   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000               "Sending iteration element on %p.\n",
1001               set);
1002   ev = GNUNET_MQ_msg_extra (msg,
1003                             ee->element.size,
1004                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1005   GNUNET_memcpy (&msg[1],
1006                  ee->element.data,
1007                  ee->element.size);
1008   msg->element_type = htons (ee->element.element_type);
1009   msg->iteration_id = htons (set->iteration_id);
1010   GNUNET_MQ_send (set->cs->mq,
1011                   ev);
1012 }
1013
1014
1015 /**
1016  * Called when a client wants to iterate the elements of a set.
1017  * Checks if we have a set associated with the client and if we
1018  * can right now start an iteration. If all checks out, starts
1019  * sending the elements of the set to the client.
1020  *
1021  * @param cls client that sent the message
1022  * @param m message sent by the client
1023  */
1024 static void
1025 handle_client_iterate (void *cls,
1026                        const struct GNUNET_MessageHeader *m)
1027 {
1028   struct ClientState *cs = cls;
1029   struct Set *set;
1030
1031   if (NULL == (set = cs->set))
1032   {
1033     /* attempt to iterate over a non existing set */
1034     GNUNET_break (0);
1035     GNUNET_SERVICE_client_drop (cs->client);
1036     return;
1037   }
1038   if (NULL != set->iter)
1039   {
1040     /* Only one concurrent iterate-action allowed per set */
1041     GNUNET_break (0);
1042     GNUNET_SERVICE_client_drop (cs->client);
1043     return;
1044   }
1045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046               "Iterating set %p in gen %u with %u content elements\n",
1047               (void *) set,
1048               set->current_generation,
1049               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1050   GNUNET_SERVICE_client_continue (cs->client);
1051   set->content->iterator_count++;
1052   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1053   set->iter_generation = set->current_generation;
1054   send_client_element (set);
1055 }
1056
1057
1058 /**
1059  * Called when a client wants to create a new set.  This is typically
1060  * the first request from a client, and includes the type of set
1061  * operation to be performed.
1062  *
1063  * @param cls client that sent the message
1064  * @param m message sent by the client
1065  */
1066 static void
1067 handle_client_create_set (void *cls,
1068                           const struct GNUNET_SET_CreateMessage *msg)
1069 {
1070   struct ClientState *cs = cls;
1071   struct Set *set;
1072
1073   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074               "Client created new set (operation %u)\n",
1075               (uint32_t) ntohl (msg->operation));
1076   if (NULL != cs->set)
1077   {
1078     /* There can only be one set per client */
1079     GNUNET_break (0);
1080     GNUNET_SERVICE_client_drop (cs->client);
1081     return;
1082   }
1083   set = GNUNET_new (struct Set);
1084   switch (ntohl (msg->operation))
1085   {
1086   case GNUNET_SET_OPERATION_INTERSECTION:
1087     set->vt = _GSS_intersection_vt ();
1088     break;
1089   case GNUNET_SET_OPERATION_UNION:
1090     set->vt = _GSS_union_vt ();
1091     break;
1092   default:
1093     GNUNET_free (set);
1094     GNUNET_break (0);
1095     GNUNET_SERVICE_client_drop (cs->client);
1096     return;
1097   }
1098   set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1099   set->state = set->vt->create ();
1100   if (NULL == set->state)
1101   {
1102     /* initialization failed (i.e. out of memory) */
1103     GNUNET_free (set);
1104     GNUNET_SERVICE_client_drop (cs->client);
1105     return;
1106   }
1107   set->content = GNUNET_new (struct SetContent);
1108   set->content->refcount = 1;
1109   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1110                                                                  GNUNET_YES);
1111   set->cs = cs;
1112   cs->set = set;
1113   GNUNET_SERVICE_client_continue (cs->client);
1114 }
1115
1116
1117 /**
1118  * Timeout happens iff:
1119  *  - we suggested an operation to our listener,
1120  *    but did not receive a response in time
1121  *  - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1122  *
1123  * @param cls channel context
1124  * @param tc context information (why was this task triggered now)
1125  */
1126 static void
1127 incoming_timeout_cb (void *cls)
1128 {
1129   struct Operation *op = cls;
1130
1131   op->timeout_task = NULL;
1132   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133               "Remote peer's incoming request timed out\n");
1134   incoming_destroy (op);
1135 }
1136
1137
1138 /**
1139  * Method called whenever another peer has added us to a channel the
1140  * other peer initiated.  Only called (once) upon reception of data
1141  * from a channel we listen on.
1142  *
1143  * The channel context represents the operation itself and gets added
1144  * to a DLL, from where it gets looked up when our local listener
1145  * client responds to a proposed/suggested operation or connects and
1146  * associates with this operation.
1147  *
1148  * @param cls closure
1149  * @param channel new handle to the channel
1150  * @param source peer that started the channel
1151  * @return initial channel context for the channel
1152  *         returns NULL on error
1153  */
1154 static void *
1155 channel_new_cb (void *cls,
1156                 struct GNUNET_CADET_Channel *channel,
1157                 const struct GNUNET_PeerIdentity *source)
1158 {
1159   struct Listener *listener = cls;
1160   struct Operation *op;
1161
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163               "New incoming channel\n");
1164   op = GNUNET_new (struct Operation);
1165   op->listener = listener;
1166   op->peer = *source;
1167   op->channel = channel;
1168   op->mq = GNUNET_CADET_get_mq (op->channel);
1169   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1170                                        UINT32_MAX);
1171   op->timeout_task
1172     = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1173                                     &incoming_timeout_cb,
1174                                     op);
1175   GNUNET_CONTAINER_DLL_insert (listener->op_head,
1176                                listener->op_tail,
1177                                op);
1178   return op;
1179 }
1180
1181
1182 /**
1183  * Function called whenever a channel is destroyed.  Should clean up
1184  * any associated state.  It must NOT call
1185  * GNUNET_CADET_channel_destroy() on the channel.
1186  *
1187  * The peer_disconnect function is part of a a virtual table set initially either
1188  * when a peer creates a new channel with us, or once we create
1189  * a new channel ourselves (evaluate).
1190  *
1191  * Once we know the exact type of operation (union/intersection), the vt is
1192  * replaced with an operation specific instance (_GSS_[op]_vt).
1193  *
1194  * @param channel_ctx place where local state associated
1195  *                   with the channel is stored
1196  * @param channel connection to the other end (henceforth invalid)
1197  */
1198 static void
1199 channel_end_cb (void *channel_ctx,
1200                 const struct GNUNET_CADET_Channel *channel)
1201 {
1202   struct Operation *op = channel_ctx;
1203
1204   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205               "channel_end_cb called\n");
1206   op->channel = NULL;
1207   if (NULL != op->listener)
1208     incoming_destroy (op);
1209   else if (NULL != op->set)
1210     op->set->vt->channel_death (op);
1211   else
1212     _GSS_operation_destroy (op,
1213                             GNUNET_YES);
1214   GNUNET_free (op);
1215 }
1216
1217
1218 /**
1219  * Function called whenever an MQ-channel's transmission window size changes.
1220  *
1221  * The first callback in an outgoing channel will be with a non-zero value
1222  * and will mean the channel is connected to the destination.
1223  *
1224  * For an incoming channel it will be called immediately after the
1225  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1226  *
1227  * @param cls Channel closure.
1228  * @param channel Connection to the other end (henceforth invalid).
1229  * @param window_size New window size. If the is more messages than buffer size
1230  *                    this value will be negative..
1231  */
1232 static void
1233 channel_window_cb (void *cls,
1234                    const struct GNUNET_CADET_Channel *channel,
1235                    int window_size)
1236 {
1237   /* FIXME: not implemented, we could do flow control here... */
1238 }
1239
1240
1241 /**
1242  * Called when a client wants to create a new listener.
1243  *
1244  * @param cls client that sent the message
1245  * @param msg message sent by the client
1246  */
1247 static void
1248 handle_client_listen (void *cls,
1249                       const struct GNUNET_SET_ListenMessage *msg)
1250 {
1251   struct ClientState *cs = cls;
1252   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1253     GNUNET_MQ_hd_var_size (incoming_msg,
1254                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1255                            struct OperationRequestMessage,
1256                            NULL),
1257     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1258                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1259                            struct IBFMessage,
1260                            NULL),
1261     GNUNET_MQ_hd_var_size (union_p2p_elements,
1262                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1263                            struct GNUNET_SET_ElementMessage,
1264                            NULL),
1265     GNUNET_MQ_hd_var_size (union_p2p_offer,
1266                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1267                            struct GNUNET_MessageHeader,
1268                            NULL),
1269     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1270                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1271                            struct InquiryMessage,
1272                            NULL),
1273     GNUNET_MQ_hd_var_size (union_p2p_demand,
1274                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1275                            struct GNUNET_MessageHeader,
1276                            NULL),
1277     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1278                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1279                              struct GNUNET_MessageHeader,
1280                              NULL),
1281     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1282                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1283                              struct GNUNET_MessageHeader,
1284                              NULL),
1285     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1286                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1287                              struct GNUNET_MessageHeader,
1288                              NULL),
1289     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1290                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1291                              struct GNUNET_MessageHeader,
1292                              NULL),
1293     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1294                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1295                            struct StrataEstimatorMessage,
1296                            NULL),
1297     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1298                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1299                            struct StrataEstimatorMessage,
1300                            NULL),
1301     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1302                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1303                            struct GNUNET_SET_ElementMessage,
1304                            NULL),
1305     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1306                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1307                              struct IntersectionElementInfoMessage,
1308                              NULL),
1309     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1310                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1311                            struct BFMessage,
1312                            NULL),
1313     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1314                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1315                              struct IntersectionDoneMessage,
1316                              NULL),
1317     GNUNET_MQ_handler_end ()
1318   };
1319   struct Listener *listener;
1320
1321   if (NULL != cs->listener)
1322   {
1323     /* max. one active listener per client! */
1324     GNUNET_break (0);
1325     GNUNET_SERVICE_client_drop (cs->client);
1326     return;
1327   }
1328   listener = GNUNET_new (struct Listener);
1329   listener->cs = cs;
1330   cs->listener = listener;
1331   listener->app_id = msg->app_id;
1332   listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1333   GNUNET_CONTAINER_DLL_insert (listener_head,
1334                                listener_tail,
1335                                listener);
1336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337               "New listener created (op %u, port %s)\n",
1338               listener->operation,
1339               GNUNET_h2s (&listener->app_id));
1340   listener->open_port
1341     = GNUNET_CADET_open_port (cadet,
1342                               &msg->app_id,
1343                               &channel_new_cb,
1344                               listener,
1345                               &channel_window_cb,
1346                               &channel_end_cb,
1347                               cadet_handlers);
1348   GNUNET_SERVICE_client_continue (cs->client);
1349 }
1350
1351
1352 /**
1353  * Called when the listening client rejects an operation
1354  * request by another peer.
1355  *
1356  * @param cls client that sent the message
1357  * @param msg message sent by the client
1358  */
1359 static void
1360 handle_client_reject (void *cls,
1361                       const struct GNUNET_SET_RejectMessage *msg)
1362 {
1363   struct ClientState *cs = cls;
1364   struct Operation *op;
1365
1366   op = get_incoming (ntohl (msg->accept_reject_id));
1367   if (NULL == op)
1368   {
1369     /* no matching incoming operation for this reject;
1370        could be that the other peer already disconnected... */
1371     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1372                 "Client rejected unknown operation %u\n",
1373                 (unsigned int) ntohl (msg->accept_reject_id));
1374     GNUNET_SERVICE_client_continue (cs->client);
1375     return;
1376   }
1377   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1378               "Peer request (op %u, app %s) rejected by client\n",
1379               op->listener->operation,
1380               GNUNET_h2s (&cs->listener->app_id));
1381   GNUNET_CADET_channel_destroy (op->channel);
1382   GNUNET_SERVICE_client_continue (cs->client);
1383 }
1384
1385
1386 /**
1387  * Called when a client wants to add or remove an element to a set it inhabits.
1388  *
1389  * @param cls client that sent the message
1390  * @param msg message sent by the client
1391  */
1392 static int
1393 check_client_mutation (void *cls,
1394                        const struct GNUNET_SET_ElementMessage *msg)
1395 {
1396   /* NOTE: Technically, we should probably check with the
1397      block library whether the element we are given is well-formed */
1398   return GNUNET_OK;
1399 }
1400
1401
1402 /**
1403  * Called when a client wants to add or remove an element to a set it inhabits.
1404  *
1405  * @param cls client that sent the message
1406  * @param msg message sent by the client
1407  */
1408 static void
1409 handle_client_mutation (void *cls,
1410                         const struct GNUNET_SET_ElementMessage *msg)
1411 {
1412   struct ClientState *cs = cls;
1413   struct Set *set;
1414
1415   if (NULL == (set = cs->set))
1416   {
1417     /* client without a set requested an operation */
1418     GNUNET_break (0);
1419     GNUNET_SERVICE_client_drop (cs->client);
1420     return;
1421   }
1422   GNUNET_SERVICE_client_continue (cs->client);
1423
1424   if (0 != set->content->iterator_count)
1425   {
1426     struct PendingMutation *pm;
1427
1428     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429                 "Scheduling mutation on set\n");
1430     pm = GNUNET_new (struct PendingMutation);
1431     pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1432     pm->set = set;
1433     GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1434                                       set->content->pending_mutations_tail,
1435                                       pm);
1436     return;
1437   }
1438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439               "Executing mutation on set\n");
1440   execute_mutation (set,
1441                     msg);
1442 }
1443
1444
1445 /**
1446  * Advance the current generation of a set,
1447  * adding exclusion ranges if necessary.
1448  *
1449  * @param set the set where we want to advance the generation
1450  */
1451 static void
1452 advance_generation (struct Set *set)
1453 {
1454   struct GenerationRange r;
1455
1456   if (set->current_generation == set->content->latest_generation)
1457   {
1458     set->content->latest_generation++;
1459     set->current_generation++;
1460     return;
1461   }
1462
1463   GNUNET_assert (set->current_generation < set->content->latest_generation);
1464
1465   r.start = set->current_generation + 1;
1466   r.end = set->content->latest_generation + 1;
1467   set->content->latest_generation = r.end;
1468   set->current_generation = r.end;
1469   GNUNET_array_append (set->excluded_generations,
1470                        set->excluded_generations_size,
1471                        r);
1472 }
1473
1474
1475 /**
1476  * Called when a client wants to initiate a set operation with another
1477  * peer.  Initiates the CADET connection to the listener and sends the
1478  * request.
1479  *
1480  * @param cls client that sent the message
1481  * @param msg message sent by the client
1482  * @return #GNUNET_OK if the message is well-formed
1483  */
1484 static int
1485 check_client_evaluate (void *cls,
1486                         const struct GNUNET_SET_EvaluateMessage *msg)
1487 {
1488   /* FIXME: suboptimal, even if the context below could be NULL,
1489      there are malformed messages this does not check for... */
1490   return GNUNET_OK;
1491 }
1492
1493
1494 /**
1495  * Called when a client wants to initiate a set operation with another
1496  * peer.  Initiates the CADET connection to the listener and sends the
1497  * request.
1498  *
1499  * @param cls client that sent the message
1500  * @param msg message sent by the client
1501  */
1502 static void
1503 handle_client_evaluate (void *cls,
1504                         const struct GNUNET_SET_EvaluateMessage *msg)
1505 {
1506   struct ClientState *cs = cls;
1507   struct Operation *op = GNUNET_new (struct Operation);
1508   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1509     GNUNET_MQ_hd_var_size (incoming_msg,
1510                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1511                            struct OperationRequestMessage,
1512                            op),
1513     GNUNET_MQ_hd_var_size (union_p2p_ibf,
1514                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1515                            struct IBFMessage,
1516                            op),
1517     GNUNET_MQ_hd_var_size (union_p2p_elements,
1518                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1519                            struct GNUNET_SET_ElementMessage,
1520                            op),
1521     GNUNET_MQ_hd_var_size (union_p2p_offer,
1522                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1523                            struct GNUNET_MessageHeader,
1524                            op),
1525     GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1526                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1527                            struct InquiryMessage,
1528                            op),
1529     GNUNET_MQ_hd_var_size (union_p2p_demand,
1530                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1531                            struct GNUNET_MessageHeader,
1532                            op),
1533     GNUNET_MQ_hd_fixed_size (union_p2p_done,
1534                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1535                              struct GNUNET_MessageHeader,
1536                              op),
1537     GNUNET_MQ_hd_fixed_size (union_p2p_over,
1538                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1539                              struct GNUNET_MessageHeader,
1540                              op),
1541     GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1542                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1543                              struct GNUNET_MessageHeader,
1544                              op),
1545     GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1546                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1547                              struct GNUNET_MessageHeader,
1548                              op),
1549     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1550                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1551                            struct StrataEstimatorMessage,
1552                            op),
1553     GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1554                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1555                            struct StrataEstimatorMessage,
1556                            op),
1557     GNUNET_MQ_hd_var_size (union_p2p_full_element,
1558                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1559                            struct GNUNET_SET_ElementMessage,
1560                            op),
1561     GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1562                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1563                              struct IntersectionElementInfoMessage,
1564                              op),
1565     GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1566                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1567                            struct BFMessage,
1568                            op),
1569     GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1570                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1571                              struct IntersectionDoneMessage,
1572                              op),
1573     GNUNET_MQ_handler_end ()
1574   };
1575   struct Set *set;
1576   const struct GNUNET_MessageHeader *context;
1577
1578   if (NULL == (set = cs->set))
1579   {
1580     GNUNET_break (0);
1581     GNUNET_free (op);
1582     GNUNET_SERVICE_client_drop (cs->client);
1583     return;
1584   }
1585   op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1586                                        UINT32_MAX);
1587   op->peer = msg->target_peer;
1588   op->result_mode = ntohl (msg->result_mode);
1589   op->client_request_id = ntohl (msg->request_id);
1590   op->byzantine = msg->byzantine;
1591   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1592   op->force_full = msg->force_full;
1593   op->force_delta = msg->force_delta;
1594   context = GNUNET_MQ_extract_nested_mh (msg);
1595
1596   /* Advance generation values, so that
1597      mutations won't interfer with the running operation. */
1598   op->set = set;
1599   op->generation_created = set->current_generation;
1600   advance_generation (set);
1601   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1602                                set->ops_tail,
1603                                op);
1604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1605               "Creating new CADET channel to port %s for set operation type %u\n",
1606               GNUNET_h2s (&msg->app_id),
1607               set->operation);
1608   op->channel = GNUNET_CADET_channel_create (cadet,
1609                                              op,
1610                                              &msg->target_peer,
1611                                              &msg->app_id,
1612                                              GNUNET_CADET_OPTION_RELIABLE,
1613                                              &channel_window_cb,
1614                                              &channel_end_cb,
1615                                              cadet_handlers);
1616   op->mq = GNUNET_CADET_get_mq (op->channel);
1617   op->state = set->vt->evaluate (op,
1618                                  context);
1619   if (NULL == op->state)
1620   {
1621     GNUNET_break (0);
1622     GNUNET_SERVICE_client_drop (cs->client);
1623     return;
1624   }
1625   GNUNET_SERVICE_client_continue (cs->client);
1626 }
1627
1628
1629 /**
1630  * Handle an ack from a client, and send the next element. Note
1631  * that we only expect acks for set elements, not after the
1632  * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1633  *
1634  * @param cls client the client
1635  * @param ack the message
1636  */
1637 static void
1638 handle_client_iter_ack (void *cls,
1639                         const struct GNUNET_SET_IterAckMessage *ack)
1640 {
1641   struct ClientState *cs = cls;
1642   struct Set *set;
1643
1644   if (NULL == (set = cs->set))
1645   {
1646     /* client without a set acknowledged receiving a value */
1647     GNUNET_break (0);
1648     GNUNET_SERVICE_client_drop (cs->client);
1649     return;
1650   }
1651   if (NULL == set->iter)
1652   {
1653     /* client sent an ack, but we were not expecting one (as
1654        set iteration has finished) */
1655     GNUNET_break (0);
1656     GNUNET_SERVICE_client_drop (cs->client);
1657     return;
1658   }
1659   GNUNET_SERVICE_client_continue (cs->client);
1660   if (ntohl (ack->send_more))
1661   {
1662     send_client_element (set);
1663   }
1664   else
1665   {
1666     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1667     set->iter = NULL;
1668     set->iteration_id++;
1669   }
1670 }
1671
1672
1673 /**
1674  * Handle a request from the client to copy a set.
1675  *
1676  * @param cls the client
1677  * @param mh the message
1678  */
1679 static void
1680 handle_client_copy_lazy_prepare (void *cls,
1681                                  const struct GNUNET_MessageHeader *mh)
1682 {
1683   struct ClientState *cs = cls;
1684   struct Set *set;
1685   struct LazyCopyRequest *cr;
1686   struct GNUNET_MQ_Envelope *ev;
1687   struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1688
1689   if (NULL == (set = cs->set))
1690   {
1691     /* client without a set requested an operation */
1692     GNUNET_break (0);
1693     GNUNET_SERVICE_client_drop (cs->client);
1694     return;
1695   }
1696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697               "Client requested creation of lazy copy\n");
1698   cr = GNUNET_new (struct LazyCopyRequest);
1699   cr->cookie = ++lazy_copy_cookie;
1700   cr->source_set = set;
1701   GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1702                                lazy_copy_tail,
1703                                cr);
1704   ev = GNUNET_MQ_msg (resp_msg,
1705                       GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1706   resp_msg->cookie = cr->cookie;
1707   GNUNET_MQ_send (set->cs->mq,
1708                   ev);
1709   GNUNET_SERVICE_client_continue (cs->client);
1710 }
1711
1712
1713 /**
1714  * Handle a request from the client to connect to a copy of a set.
1715  *
1716  * @param cls the client
1717  * @param msg the message
1718  */
1719 static void
1720 handle_client_copy_lazy_connect (void *cls,
1721                                  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1722 {
1723   struct ClientState *cs = cls;
1724   struct LazyCopyRequest *cr;
1725   struct Set *set;
1726   int found;
1727
1728   if (NULL != cs->set)
1729   {
1730     /* There can only be one set per client */
1731     GNUNET_break (0);
1732     GNUNET_SERVICE_client_drop (cs->client);
1733     return;
1734   }
1735   found = GNUNET_NO;
1736   for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1737   {
1738     if (cr->cookie == msg->cookie)
1739     {
1740       found = GNUNET_YES;
1741       break;
1742     }
1743   }
1744   if (GNUNET_NO == found)
1745   {
1746     /* client asked for copy with cookie we don't know */
1747     GNUNET_break (0);
1748     GNUNET_SERVICE_client_drop (cs->client);
1749     return;
1750   }
1751   GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1752                                lazy_copy_tail,
1753                                cr);
1754   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1755               "Client %p requested use of lazy copy\n",
1756               cs);
1757   set = GNUNET_new (struct Set);
1758   switch (cr->source_set->operation)
1759   {
1760   case GNUNET_SET_OPERATION_INTERSECTION:
1761     set->vt = _GSS_intersection_vt ();
1762     break;
1763   case GNUNET_SET_OPERATION_UNION:
1764     set->vt = _GSS_union_vt ();
1765     break;
1766   default:
1767     GNUNET_assert (0);
1768     return;
1769   }
1770
1771   if (NULL == set->vt->copy_state)
1772   {
1773     /* Lazy copy not supported for this set operation */
1774     GNUNET_break (0);
1775     GNUNET_free (set);
1776     GNUNET_free (cr);
1777     GNUNET_SERVICE_client_drop (cs->client);
1778     return;
1779   }
1780
1781   set->operation = cr->source_set->operation;
1782   set->state = set->vt->copy_state (cr->source_set->state);
1783   set->content = cr->source_set->content;
1784   set->content->refcount++;
1785
1786   set->current_generation = cr->source_set->current_generation;
1787   set->excluded_generations_size = cr->source_set->excluded_generations_size;
1788   set->excluded_generations
1789     = GNUNET_memdup (cr->source_set->excluded_generations,
1790                      set->excluded_generations_size * sizeof (struct GenerationRange));
1791
1792   /* Advance the generation of the new set, so that mutations to the
1793      of the cloned set and the source set are independent. */
1794   advance_generation (set);
1795   set->cs = cs;
1796   cs->set = set;
1797   GNUNET_free (cr);
1798   GNUNET_SERVICE_client_continue (cs->client);
1799 }
1800
1801
1802 /**
1803  * Handle a request from the client to cancel a running set operation.
1804  *
1805  * @param cls the client
1806  * @param msg the message
1807  */
1808 static void
1809 handle_client_cancel (void *cls,
1810                       const struct GNUNET_SET_CancelMessage *msg)
1811 {
1812   struct ClientState *cs = cls;
1813   struct Set *set;
1814   struct Operation *op;
1815   int found;
1816
1817   if (NULL == (set = cs->set))
1818   {
1819     /* client without a set requested an operation */
1820     GNUNET_break (0);
1821     GNUNET_SERVICE_client_drop (cs->client);
1822     return;
1823   }
1824   found = GNUNET_NO;
1825   for (op = set->ops_head; NULL != op; op = op->next)
1826   {
1827     if (op->client_request_id == ntohl (msg->request_id))
1828     {
1829       found = GNUNET_YES;
1830       break;
1831     }
1832   }
1833   if (GNUNET_NO == found)
1834   {
1835     /* It may happen that the operation was already destroyed due to
1836      * the other peer disconnecting.  The client may not know about this
1837      * yet and try to cancel the (just barely non-existent) operation.
1838      * So this is not a hard error.
1839      */
1840     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1841                 "Client canceled non-existent op %u\n",
1842                 (uint32_t) ntohl (msg->request_id));
1843   }
1844   else
1845   {
1846     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847                 "Client requested cancel for op %u\n",
1848                 (uint32_t) ntohl (msg->request_id));
1849     _GSS_operation_destroy (op,
1850                             GNUNET_YES);
1851   }
1852   GNUNET_SERVICE_client_continue (cs->client);
1853 }
1854
1855
1856 /**
1857  * Handle a request from the client to accept a set operation that
1858  * came from a remote peer.  We forward the accept to the associated
1859  * operation for handling
1860  *
1861  * @param cls the client
1862  * @param msg the message
1863  */
1864 static void
1865 handle_client_accept (void *cls,
1866                       const struct GNUNET_SET_AcceptMessage *msg)
1867 {
1868   struct ClientState *cs = cls;
1869   struct Set *set;
1870   struct Operation *op;
1871   struct GNUNET_SET_ResultMessage *result_message;
1872   struct GNUNET_MQ_Envelope *ev;
1873   struct Listener *listener;
1874
1875   if (NULL == (set = cs->set))
1876   {
1877     /* client without a set requested to accept */
1878     GNUNET_break (0);
1879     GNUNET_SERVICE_client_drop (cs->client);
1880     return;
1881   }
1882   op = get_incoming (ntohl (msg->accept_reject_id));
1883   if (NULL == op)
1884   {
1885     /* It is not an error if the set op does not exist -- it may
1886      * have been destroyed when the partner peer disconnected. */
1887     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1888                 "Client %p accepted request %u of listener %p that is no longer active\n",
1889                 cs,
1890                 ntohl (msg->accept_reject_id),
1891                 cs->listener);
1892     ev = GNUNET_MQ_msg (result_message,
1893                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1894     result_message->request_id = msg->request_id;
1895     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1896     GNUNET_MQ_send (set->cs->mq,
1897                     ev);
1898     GNUNET_SERVICE_client_continue (cs->client);
1899     return;
1900   }
1901   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1902               "Client accepting request %u\n",
1903               (uint32_t) ntohl (msg->accept_reject_id));
1904   listener = op->listener;
1905   op->listener = NULL;
1906   GNUNET_CONTAINER_DLL_remove (listener->op_head,
1907                                listener->op_tail,
1908                                op);
1909   op->set = set;
1910   GNUNET_CONTAINER_DLL_insert (set->ops_head,
1911                                set->ops_tail,
1912                                op);
1913   op->client_request_id = ntohl (msg->request_id);
1914   op->result_mode = ntohl (msg->result_mode);
1915   op->byzantine = msg->byzantine;
1916   op->byzantine_lower_bound = msg->byzantine_lower_bound;
1917   op->force_full = msg->force_full;
1918   op->force_delta = msg->force_delta;
1919
1920   /* Advance generation values, so that future mutations do not
1921      interfer with the running operation. */
1922   op->generation_created = set->current_generation;
1923   advance_generation (set);
1924   GNUNET_assert (NULL == op->state);
1925   op->state = set->vt->accept (op);
1926   if (NULL == op->state)
1927   {
1928     GNUNET_break (0);
1929     GNUNET_SERVICE_client_drop (cs->client);
1930     return;
1931   }
1932   /* Now allow CADET to continue, as we did not do this in
1933      #handle_incoming_msg (as we wanted to first see if the
1934      local client would accept the request). */
1935   GNUNET_CADET_receive_done (op->channel);
1936   GNUNET_SERVICE_client_continue (cs->client);
1937 }
1938
1939
1940 /**
1941  * Called to clean up, after a shutdown has been requested.
1942  *
1943  * @param cls closure, NULL
1944  */
1945 static void
1946 shutdown_task (void *cls)
1947 {
1948   /* Delay actual shutdown to allow service to disconnect clients */
1949   in_shutdown = GNUNET_YES;
1950   if (0 == num_clients)
1951   {
1952     if (NULL != cadet)
1953     {
1954       GNUNET_CADET_disconnect (cadet);
1955       cadet = NULL;
1956     }
1957   }
1958   GNUNET_STATISTICS_destroy (_GSS_statistics,
1959                              GNUNET_YES);
1960   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1961               "handled shutdown request\n");
1962 }
1963
1964
1965 /**
1966  * Function called by the service's run
1967  * method to run service-specific setup code.
1968  *
1969  * @param cls closure
1970  * @param cfg configuration to use
1971  * @param service the initialized service
1972  */
1973 static void
1974 run (void *cls,
1975      const struct GNUNET_CONFIGURATION_Handle *cfg,
1976      struct GNUNET_SERVICE_Handle *service)
1977 {
1978   /* FIXME: need to modify SERVICE (!) API to allow
1979      us to run a shutdown task *after* clients were
1980      forcefully disconnected! */
1981   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1982                                  NULL);
1983   _GSS_statistics = GNUNET_STATISTICS_create ("set",
1984                                               cfg);
1985   cadet = GNUNET_CADET_connect (cfg);
1986   if (NULL == cadet)
1987   {
1988     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1989                 _("Could not connect to CADET service\n"));
1990     GNUNET_SCHEDULER_shutdown ();
1991     return;
1992   }
1993 }
1994
1995
1996 /**
1997  * Define "main" method using service macro.
1998  */
1999 GNUNET_SERVICE_MAIN
2000 ("set",
2001  GNUNET_SERVICE_OPTION_NONE,
2002  &run,
2003  &client_connect_cb,
2004  &client_disconnect_cb,
2005  NULL,
2006  GNUNET_MQ_hd_fixed_size (client_accept,
2007                           GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2008                           struct GNUNET_SET_AcceptMessage,
2009                           NULL),
2010  GNUNET_MQ_hd_fixed_size (client_iter_ack,
2011                           GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2012                           struct GNUNET_SET_IterAckMessage,
2013                           NULL),
2014  GNUNET_MQ_hd_var_size (client_mutation,
2015                         GNUNET_MESSAGE_TYPE_SET_ADD,
2016                         struct GNUNET_SET_ElementMessage,
2017                         NULL),
2018  GNUNET_MQ_hd_fixed_size (client_create_set,
2019                           GNUNET_MESSAGE_TYPE_SET_CREATE,
2020                           struct GNUNET_SET_CreateMessage,
2021                           NULL),
2022  GNUNET_MQ_hd_fixed_size (client_iterate,
2023                           GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2024                           struct GNUNET_MessageHeader,
2025                           NULL),
2026  GNUNET_MQ_hd_var_size (client_evaluate,
2027                         GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2028                         struct GNUNET_SET_EvaluateMessage,
2029                         NULL),
2030  GNUNET_MQ_hd_fixed_size (client_listen,
2031                           GNUNET_MESSAGE_TYPE_SET_LISTEN,
2032                           struct GNUNET_SET_ListenMessage,
2033                           NULL),
2034  GNUNET_MQ_hd_fixed_size (client_reject,
2035                           GNUNET_MESSAGE_TYPE_SET_REJECT,
2036                           struct GNUNET_SET_RejectMessage,
2037                           NULL),
2038  GNUNET_MQ_hd_var_size (client_mutation,
2039                         GNUNET_MESSAGE_TYPE_SET_REMOVE,
2040                         struct GNUNET_SET_ElementMessage,
2041                         NULL),
2042  GNUNET_MQ_hd_fixed_size (client_cancel,
2043                           GNUNET_MESSAGE_TYPE_SET_CANCEL,
2044                           struct GNUNET_SET_CancelMessage,
2045                           NULL),
2046  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2047                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2048                           struct GNUNET_MessageHeader,
2049                           NULL),
2050  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2051                           GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2052                           struct GNUNET_SET_CopyLazyConnectMessage,
2053                           NULL),
2054  GNUNET_MQ_handler_end ());
2055
2056
2057 /* end of gnunet-service-set.c */